From 2e60343e3f029ec24f3f3502c5ac764d9cf3f28d Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 30 Oct 2024 15:00:00 -0700 Subject: [PATCH] (otelarrowreceiver): Fix incorrect OTLP admission control behavior (#36078) --- .chloggen/otelarrow-otlp-admit-fix.yaml | 27 +++ internal/otelarrow/admission/boundedqueue.go | 12 +- .../otelarrow/admission/boundedqueue_test.go | 12 +- receiver/otelarrowreceiver/go.mod | 2 +- .../internal/arrow/arrow_test.go | 9 +- .../otelarrowreceiver/internal/logs/otlp.go | 22 +-- .../internal/logs/otlp_test.go | 180 +++++++++++------ .../internal/metrics/otlp.go | 16 +- .../internal/metrics/otlp_test.go | 183 +++++++++++------- .../otelarrowreceiver/internal/trace/otlp.go | 18 +- .../internal/trace/otlp_test.go | 168 ++++++++++------ receiver/otelarrowreceiver/otelarrow.go | 2 +- 12 files changed, 420 insertions(+), 231 deletions(-) create mode 100644 .chloggen/otelarrow-otlp-admit-fix.yaml diff --git a/.chloggen/otelarrow-otlp-admit-fix.yaml b/.chloggen/otelarrow-otlp-admit-fix.yaml new file mode 100644 index 000000000000..ee60a5143a43 --- /dev/null +++ b/.chloggen/otelarrow-otlp-admit-fix.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Avoid breaking telemetry when admission control fails in OTLP handlers. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/otelarrow/admission/boundedqueue.go b/internal/otelarrow/admission/boundedqueue.go index ea3f255db551..7aeeeb8b3457 100644 --- a/internal/otelarrow/admission/boundedqueue.go +++ b/internal/otelarrow/admission/boundedqueue.go @@ -10,12 +10,16 @@ import ( "github.com/google/uuid" orderedmap "github.com/wk8/go-ordered-map/v2" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" + grpccodes "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) -var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters") +var ErrTooManyWaiters = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data") +var ErrRequestTooLarge = status.Error(grpccodes.InvalidArgument, "rejecting request, request is too large") type BoundedQueue struct { maxLimitBytes int64 @@ -33,12 +37,12 @@ type waiter struct { ID uuid.UUID } -func NewBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { +func NewBoundedQueue(ts component.TelemetrySettings, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue { return &BoundedQueue{ maxLimitBytes: maxLimitBytes, maxLimitWaiters: maxLimitWaiters, waiters: orderedmap.New[uuid.UUID, waiter](), - tracer: tp.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), + tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), } } @@ -47,7 +51,7 @@ func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) { defer bq.lock.Unlock() if pendingBytes > bq.maxLimitBytes { // will never succeed - return false, fmt.Errorf("rejecting request, request size larger than configured limit") + return false, ErrRequestTooLarge } if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit diff --git a/internal/otelarrow/admission/boundedqueue_test.go b/internal/otelarrow/admission/boundedqueue_test.go index e0c4ac471f10..55a86d5f2e40 100644 --- a/internal/otelarrow/admission/boundedqueue_test.go +++ b/internal/otelarrow/admission/boundedqueue_test.go @@ -11,10 +11,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.opentelemetry.io/otel/trace/noop" "go.uber.org/multierr" ) @@ -39,7 +39,7 @@ func abs(x int64) int64 { return x } -var noopTraces = noop.NewTracerProvider() +var noopTelemetry = componenttest.NewNopTelemetrySettings() func TestAcquireSimpleNoWaiters(t *testing.T) { maxLimitBytes := 1000 @@ -47,7 +47,7 @@ func TestAcquireSimpleNoWaiters(t *testing.T) { numRequests := 40 requestSize := 21 - bq := NewBoundedQueue(noopTraces, int64(maxLimitBytes), int64(maxLimitWaiters)) + bq := NewBoundedQueue(noopTelemetry, int64(maxLimitBytes), int64(maxLimitWaiters)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -99,7 +99,7 @@ func TestAcquireBoundedWithWaiters(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - bq := NewBoundedQueue(noopTraces, tt.maxLimitBytes, tt.maxLimitWaiters) + bq := NewBoundedQueue(noopTelemetry, tt.maxLimitBytes, tt.maxLimitWaiters) var blockedRequests int64 numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked) @@ -160,8 +160,10 @@ func TestAcquireContextCanceled(t *testing.T) { exp := tracetest.NewInMemoryExporter() tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + ts := noopTelemetry + ts.TracerProvider = tp - bq := NewBoundedQueue(tp, int64(maxLimitBytes), int64(maxLimitWaiters)) + bq := NewBoundedQueue(ts, int64(maxLimitBytes), int64(maxLimitWaiters)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) var errs error diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index ca6b3314f106..42e6763677e8 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -25,6 +25,7 @@ require ( go.opentelemetry.io/collector/receiver v0.112.0 go.opentelemetry.io/otel v1.31.0 go.opentelemetry.io/otel/metric v1.31.0 + go.opentelemetry.io/otel/sdk v1.31.0 go.opentelemetry.io/otel/sdk/metric v1.31.0 go.opentelemetry.io/otel/trace v1.31.0 go.uber.org/goleak v1.3.0 @@ -81,7 +82,6 @@ require ( go.opentelemetry.io/collector/pipeline v0.112.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0 // indirect - go.opentelemetry.io/otel/sdk v1.31.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/sync v0.8.0 // indirect diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 9a11185ee24a..64a6a5c7b448 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -36,7 +36,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/otel/trace/noop" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" "golang.org/x/net/http2/hpack" @@ -50,10 +49,10 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" ) -var noopTraces = noop.NewTracerProvider() +var noopTelemetry = componenttest.NewNopTelemetrySettings() func defaultBQ() *admission.BoundedQueue { - return admission.NewBoundedQueue(noopTraces, int64(100000), int64(10)) + return admission.NewBoundedQueue(noopTelemetry, int64(100000), int64(10)) } type compareJSONTraces struct{ ptrace.Traces } @@ -490,10 +489,10 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) { var bq *admission.BoundedQueue if tt.rejected { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) - bq = admission.NewBoundedQueue(noopTraces, int64(sizer.TracesSize(td)-100), 10) + bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(noopTraces, defaultBoundedQueueLimit, 10) + bq = admission.NewBoundedQueue(noopTelemetry, defaultBoundedQueueLimit, 10) } ctc.start(ctc.newRealConsumer, bq) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 23ec4c96fbbc..889b8cd80128 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -41,26 +41,24 @@ func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper // Export implements the service Export logs func. func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plogotlp.ExportResponse, error) { ld := req.Logs() - numSpans := ld.LogRecordCount() - if numSpans == 0 { + numRecords := ld.LogRecordCount() + if numRecords == 0 { return plogotlp.NewExportResponse(), nil } ctx = r.obsrecv.StartLogsOp(ctx) + var err error sizeBytes := int64(r.sizer.LogsSize(req.Logs())) - err := r.boundedQueue.Acquire(ctx, sizeBytes) - if err != nil { - return plogotlp.NewExportResponse(), err + if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + err = r.nextConsumer.ConsumeLogs(ctx, ld) + // Release() is not checked, see #36074. + _ = r.boundedQueue.Release(sizeBytes) // immediate release + } else { + err = acqErr } - defer func() { - if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) - } - }() - err = r.nextConsumer.ConsumeLogs(ctx, ld) - r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numSpans, err) + r.obsrecv.EndLogsOp(ctx, dataFormatProtobuf, numRecords, err) return plogotlp.NewExportResponse(), err } diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index bf58f738c34e..a0bbb056f24c 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -8,134 +8,181 @@ import ( "errors" "net" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/multierr" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( - maxWaiters = 10 - maxBytes = int64(250) + maxBytes = 250 ) -func TestExport(t *testing.T) { +type testSink struct { + consumertest.LogsSink + context.Context + context.CancelFunc +} + +func newTestSink() *testSink { + ctx, cancel := context.WithCancel(context.Background()) + return &testSink{ + Context: ctx, + CancelFunc: cancel, + } +} + +func (ts *testSink) unblock() { + time.Sleep(10 * time.Millisecond) + ts.CancelFunc() +} + +func (ts *testSink) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + <-ts.Context.Done() + return ts.LogsSink.ConsumeLogs(ctx, ld) +} + +func TestExport_Success(t *testing.T) { ld := testdata.GenerateLogs(1) req := plogotlp.NewExportRequestFromLogs(ld) - logSink := new(consumertest.LogsSink) - logClient := makeLogsServiceClient(t, logSink) - resp, err := logClient.Export(context.Background(), req) + logSink := newTestSink() + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) + + go logSink.unblock() + resp, err := logsClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") - lds := logSink.AllLogs() - require.Len(t, lds, 1) - assert.EqualValues(t, ld, lds[0]) + require.Len(t, logSink.AllLogs(), 1) + assert.EqualValues(t, ld, logSink.AllLogs()[0]) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } func TestExport_EmptyRequest(t *testing.T) { - logSink := new(consumertest.LogsSink) + logSink := newTestSink() + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) + empty := plogotlp.NewExportRequest() - logClient := makeLogsServiceClient(t, logSink) - resp, err := logClient.Export(context.Background(), plogotlp.NewExportRequest()) + go logSink.unblock() + resp, err := logsClient.Export(context.Background(), empty) assert.NoError(t, err, "Failed to export trace: %v", err) assert.NotNil(t, resp, "The response is missing") + + require.Empty(t, logSink.AllLogs()) + + // No self-tracing spans are issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Empty(t, selfExp.GetSpans()) } func TestExport_ErrorConsumer(t *testing.T) { ld := testdata.GenerateLogs(1) req := plogotlp.NewExportRequestFromLogs(ld) - logClient := makeLogsServiceClient(t, consumertest.NewErr(errors.New("my error"))) - resp, err := logClient.Export(context.Background(), req) + logsClient, selfExp, selfProv := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) + resp, err := logsClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Equal(t, plogotlp.ExportResponse{}, resp) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } -func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { +func TestExport_AdmissionRequestTooLarge(t *testing.T) { ld := testdata.GenerateLogs(10) - logSink := new(consumertest.LogsSink) + logSink := newTestSink() req := plogotlp.NewExportRequestFromLogs(ld) + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) - logClient := makeLogsServiceClient(t, logSink) - resp, err := logClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + go logSink.unblock() + resp, err := logsClient.Export(context.Background(), req) + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = rejecting request, request is too large") assert.Equal(t, plogotlp.ExportResponse{}, resp) -} -func TestExport_TooManyWaiters(t *testing.T) { - bc := testconsumer.NewBlockingConsumer() + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) +} - logsClient := makeLogsServiceClient(t, bc) - bg := context.Background() - var errs, err error +func TestExport_AdmissionLimitExceeded(t *testing.T) { ld := testdata.GenerateLogs(1) + logSink := newTestSink() req := plogotlp.NewExportRequestFromLogs(ld) - var mtx sync.Mutex - numResponses := 0 - // Send request that will acquire all of the semaphores bytes and block. - go func() { - _, err = logsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() - }() - for i := 0; i < maxWaiters+1; i++ { + logsClient, selfExp, selfProv := makeTraceServiceClient(t, logSink) + + var wait sync.WaitGroup + wait.Add(10) + + var expectSuccess atomic.Int32 + + for i := 0; i < 10; i++ { go func() { - _, err := logsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() + defer wait.Done() + _, err := logsClient.Export(context.Background(), req) + if err == nil { + // some succeed! + expectSuccess.Add(1) + return + } + assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = rejecting request, too much pending data") }() } - // sleep so all async requests are blocked on semaphore Acquire. - time.Sleep(1 * time.Second) + logSink.unblock() + wait.Wait() - // unblock and wait for errors to be returned and written. - bc.Unblock() - assert.Eventually(t, func() bool { - mtx.Lock() - defer mtx.Unlock() - errSlice := multierr.Errors(errs) - return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3*time.Second, 10*time.Millisecond) + // 10 self-tracing spans are issued + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 10) - assert.ErrorContains(t, errs, "too many waiters") + // Expect the correct number of success and failure. + testSuccess := 0 + for _, span := range selfExp.GetSpans() { + switch span.Status.Code { + case codes.Ok, codes.Unset: + testSuccess++ + } + } + require.Equal(t, int(expectSuccess.Load()), testSuccess) } -func makeLogsServiceClient(t *testing.T, lc consumer.Logs) plogotlp.GRPCClient { - addr := otlpReceiverOnGRPCServer(t, lc) +func makeTraceServiceClient(t *testing.T, lc consumer.Logs) (plogotlp.GRPCClient, *tracetest.InMemoryExporter, *trace.TracerProvider) { + addr, exp, tp := otlpReceiverOnGRPCServer(t, lc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) t.Cleanup(func() { require.NoError(t, cc.Close()) }) - return plogotlp.NewGRPCClient(cc) + return plogotlp.NewGRPCClient(cc), exp, tp } -func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { +func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) (net.Addr, *tracetest.InMemoryExporter, *trace.TracerProvider) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) @@ -143,16 +190,23 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { require.NoError(t, ln.Close()) }) + exp := tracetest.NewInMemoryExporter() + + tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + telset := componenttest.NewNopTelemetrySettings() + telset.TracerProvider = tp + set := receivertest.NewNopSettings() - set.ID = component.NewIDWithName(component.MustNewType("otlp"), "log") + set.TelemetrySettings = telset + + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "logs") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", ReceiverCreateSettings: set, }) require.NoError(t, err) - - bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() @@ -161,5 +215,5 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) net.Addr { _ = srv.Serve(ln) }() - return ln.Addr() + return ln.Addr(), exp, tp } diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index d038d63bef3d..fce93dfeaee7 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -48,18 +48,16 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p ctx = r.obsrecv.StartMetricsOp(ctx) + var err error sizeBytes := int64(r.sizer.MetricsSize(req.Metrics())) - err := r.boundedQueue.Acquire(ctx, sizeBytes) - if err != nil { - return pmetricotlp.NewExportResponse(), err + if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + err = r.nextConsumer.ConsumeMetrics(ctx, md) + // Release() is not checked, see #36074. + _ = r.boundedQueue.Release(sizeBytes) // immediate release + } else { + err = acqErr } - defer func() { - if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) - } - }() - err = r.nextConsumer.ConsumeMetrics(ctx, md) r.obsrecv.EndMetricsOp(ctx, dataFormatProtobuf, dataPointCount, err) return pmetricotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index 9bd0b9911e57..77a690963bac 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -8,135 +8,181 @@ import ( "errors" "net" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/multierr" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( - maxWaiters = 10 - maxBytes = int64(250) + maxBytes = 250 ) -func TestExport(t *testing.T) { +type testSink struct { + consumertest.MetricsSink + context.Context + context.CancelFunc +} + +func newTestSink() *testSink { + ctx, cancel := context.WithCancel(context.Background()) + return &testSink{ + Context: ctx, + CancelFunc: cancel, + } +} + +func (ts *testSink) unblock() { + time.Sleep(10 * time.Millisecond) + ts.CancelFunc() +} + +func (ts *testSink) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + <-ts.Context.Done() + return ts.MetricsSink.ConsumeMetrics(ctx, md) +} + +func TestExport_Success(t *testing.T) { md := testdata.GenerateMetrics(1) req := pmetricotlp.NewExportRequestFromMetrics(md) - metricSink := new(consumertest.MetricsSink) - metricsClient := makeMetricsServiceClient(t, metricSink) - resp, err := metricsClient.Export(context.Background(), req) + metricsSink := newTestSink() + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) - require.NoError(t, err, "Failed to export metrics: %v", err) + go metricsSink.unblock() + resp, err := metricsClient.Export(context.Background(), req) + require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") - mds := metricSink.AllMetrics() - require.Len(t, mds, 1) - assert.EqualValues(t, md, mds[0]) + require.Len(t, metricsSink.AllMetrics(), 1) + assert.EqualValues(t, md, metricsSink.AllMetrics()[0]) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } func TestExport_EmptyRequest(t *testing.T) { - metricSink := new(consumertest.MetricsSink) - metricsClient := makeMetricsServiceClient(t, metricSink) - resp, err := metricsClient.Export(context.Background(), pmetricotlp.NewExportRequest()) - require.NoError(t, err) - require.NotNil(t, resp) + metricsSink := newTestSink() + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) + empty := pmetricotlp.NewExportRequest() + + go metricsSink.unblock() + resp, err := metricsClient.Export(context.Background(), empty) + assert.NoError(t, err, "Failed to export trace: %v", err) + assert.NotNil(t, resp, "The response is missing") + + require.Empty(t, metricsSink.AllMetrics()) + + // No self-tracing spans are issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Empty(t, selfExp.GetSpans()) } func TestExport_ErrorConsumer(t *testing.T) { md := testdata.GenerateMetrics(1) req := pmetricotlp.NewExportRequestFromMetrics(md) - metricsClient := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error"))) + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, consumertest.NewErr(errors.New("my error"))) resp, err := metricsClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Equal(t, pmetricotlp.ExportResponse{}, resp) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } -func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { +func TestExport_AdmissionRequestTooLarge(t *testing.T) { md := testdata.GenerateMetrics(10) - metricSink := new(consumertest.MetricsSink) + metricsSink := newTestSink() req := pmetricotlp.NewExportRequestFromMetrics(md) + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) - metricsClient := makeMetricsServiceClient(t, metricSink) + go metricsSink.unblock() resp, err := metricsClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = rejecting request, request is too large") assert.Equal(t, pmetricotlp.ExportResponse{}, resp) -} -func TestExport_TooManyWaiters(t *testing.T) { - bc := testconsumer.NewBlockingConsumer() + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) +} - metricsClient := makeMetricsServiceClient(t, bc) - bg := context.Background() - var errs, err error +func TestExport_AdmissionLimitExceeded(t *testing.T) { md := testdata.GenerateMetrics(1) + metricsSink := newTestSink() req := pmetricotlp.NewExportRequestFromMetrics(md) - var mtx sync.Mutex - numResponses := 0 - // Send request that will acquire all of the semaphores bytes and block. - go func() { - _, err = metricsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() - }() - for i := 0; i < maxWaiters+1; i++ { + metricsClient, selfExp, selfProv := makeMetricsServiceClient(t, metricsSink) + + var wait sync.WaitGroup + wait.Add(10) + + var expectSuccess atomic.Int32 + + for i := 0; i < 10; i++ { go func() { - _, err := metricsClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() + defer wait.Done() + _, err := metricsClient.Export(context.Background(), req) + if err == nil { + // some succeed! + expectSuccess.Add(1) + return + } + assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = rejecting request, too much pending data") }() } - // sleep so all async requests are blocked on semaphore Acquire. - time.Sleep(1 * time.Second) + metricsSink.unblock() + wait.Wait() - // unblock and wait for errors to be returned and written. - bc.Unblock() - assert.Eventually(t, func() bool { - mtx.Lock() - defer mtx.Unlock() - errSlice := multierr.Errors(errs) - return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3*time.Second, 10*time.Millisecond) + // 10 self-tracing spans are issued + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 10) - assert.ErrorContains(t, errs, "too many waiters") + // Expect the correct number of success and failure. + testSuccess := 0 + for _, span := range selfExp.GetSpans() { + switch span.Status.Code { + case codes.Ok, codes.Unset: + testSuccess++ + } + } + require.Equal(t, int(expectSuccess.Load()), testSuccess) } -func makeMetricsServiceClient(t *testing.T, mc consumer.Metrics) pmetricotlp.GRPCClient { - addr := otlpReceiverOnGRPCServer(t, mc) - +func makeMetricsServiceClient(t *testing.T, mc consumer.Metrics) (pmetricotlp.GRPCClient, *tracetest.InMemoryExporter, *trace.TracerProvider) { + addr, exp, tp := otlpReceiverOnGRPCServer(t, mc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err, "Failed to create the MetricsServiceClient: %v", err) + require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) t.Cleanup(func() { require.NoError(t, cc.Close()) }) - return pmetricotlp.NewGRPCClient(cc) + return pmetricotlp.NewGRPCClient(cc), exp, tp } -func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { +func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, *tracetest.InMemoryExporter, *trace.TracerProvider) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) @@ -144,7 +190,15 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { require.NoError(t, ln.Close()) }) + exp := tracetest.NewInMemoryExporter() + + tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + telset := componenttest.NewNopTelemetrySettings() + telset.TracerProvider = tp + set := receivertest.NewNopSettings() + set.TelemetrySettings = telset + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "metrics") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, @@ -152,8 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - - bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() @@ -162,5 +215,5 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) net.Addr { _ = srv.Serve(ln) }() - return ln.Addr() + return ln.Addr(), exp, tp } diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index af9bc335ea19..b2ebcd5bc485 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -41,25 +41,23 @@ func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelp // Export implements the service Export traces func. func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (ptraceotlp.ExportResponse, error) { td := req.Traces() - // We need to ensure that it propagates the receiver name as a tag numSpans := td.SpanCount() if numSpans == 0 { return ptraceotlp.NewExportResponse(), nil } + ctx = r.obsrecv.StartTracesOp(ctx) + var err error sizeBytes := int64(r.sizer.TracesSize(req.Traces())) - err := r.boundedQueue.Acquire(ctx, sizeBytes) - if err != nil { - return ptraceotlp.NewExportResponse(), err + if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + err = r.nextConsumer.ConsumeTraces(ctx, td) + // Release() is not checked, see #36074. + _ = r.boundedQueue.Release(sizeBytes) // immediate release + } else { + err = acqErr } - defer func() { - if releaseErr := r.boundedQueue.Release(sizeBytes); releaseErr != nil { - r.logger.Error("Error releasing bytes from semaphore", zap.Error(releaseErr)) - } - }() - err = r.nextConsumer.ConsumeTraces(ctx, td) r.obsrecv.EndTracesOp(ctx, dataFormatProtobuf, numSpans, err) return ptraceotlp.NewExportResponse(), err diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index b968b79d20d8..ce769f5451c4 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -8,133 +8,181 @@ import ( "errors" "net" "sync" + "sync/atomic" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.opentelemetry.io/otel/trace/noop" - "go.uber.org/multierr" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/testconsumer" ) const ( - maxWaiters = 10 - maxBytes = int64(250) + maxBytes = 250 ) -func TestExport(t *testing.T) { +type testSink struct { + consumertest.TracesSink + context.Context + context.CancelFunc +} + +func newTestSink() *testSink { + ctx, cancel := context.WithCancel(context.Background()) + return &testSink{ + Context: ctx, + CancelFunc: cancel, + } +} + +func (ts *testSink) unblock() { + time.Sleep(10 * time.Millisecond) + ts.CancelFunc() +} + +func (ts *testSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + <-ts.Context.Done() + return ts.TracesSink.ConsumeTraces(ctx, td) +} + +func TestExport_Success(t *testing.T) { td := testdata.GenerateTraces(1) req := ptraceotlp.NewExportRequestFromTraces(td) - traceSink := new(consumertest.TracesSink) - traceClient := makeTraceServiceClient(t, traceSink) + traceSink := newTestSink() + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) + + go traceSink.unblock() resp, err := traceClient.Export(context.Background(), req) require.NoError(t, err, "Failed to export trace: %v", err) require.NotNil(t, resp, "The response is missing") require.Len(t, traceSink.AllTraces(), 1) assert.EqualValues(t, td, traceSink.AllTraces()[0]) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } func TestExport_EmptyRequest(t *testing.T) { - traceSink := new(consumertest.TracesSink) - traceClient := makeTraceServiceClient(t, traceSink) - resp, err := traceClient.Export(context.Background(), ptraceotlp.NewExportRequest()) + traceSink := newTestSink() + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) + empty := ptraceotlp.NewExportRequest() + + go traceSink.unblock() + resp, err := traceClient.Export(context.Background(), empty) assert.NoError(t, err, "Failed to export trace: %v", err) assert.NotNil(t, resp, "The response is missing") + + require.Empty(t, traceSink.AllTraces()) + + // No self-tracing spans are issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Empty(t, selfExp.GetSpans()) } func TestExport_ErrorConsumer(t *testing.T) { td := testdata.GenerateTraces(1) req := ptraceotlp.NewExportRequestFromTraces(td) - traceClient := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) + traceClient, selfExp, selfProv := makeTraceServiceClient(t, consumertest.NewErr(errors.New("my error"))) resp, err := traceClient.Export(context.Background(), req) assert.EqualError(t, err, "rpc error: code = Unknown desc = my error") assert.Equal(t, ptraceotlp.ExportResponse{}, resp) + + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) } -func TestExport_AdmissionLimitBytesExceeded(t *testing.T) { +func TestExport_AdmissionRequestTooLarge(t *testing.T) { td := testdata.GenerateTraces(10) - traceSink := new(consumertest.TracesSink) + traceSink := newTestSink() req := ptraceotlp.NewExportRequestFromTraces(td) + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) - traceClient := makeTraceServiceClient(t, traceSink) - + go traceSink.unblock() resp, err := traceClient.Export(context.Background(), req) - assert.EqualError(t, err, "rpc error: code = Unknown desc = rejecting request, request size larger than configured limit") + assert.EqualError(t, err, "rpc error: code = InvalidArgument desc = rejecting request, request is too large") assert.Equal(t, ptraceotlp.ExportResponse{}, resp) -} -func TestExport_TooManyWaiters(t *testing.T) { - bc := testconsumer.NewBlockingConsumer() + // One self-tracing spans is issued. + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 1) +} - traceClient := makeTraceServiceClient(t, bc) - bg := context.Background() - var errs, err error +func TestExport_AdmissionLimitExceeded(t *testing.T) { td := testdata.GenerateTraces(1) + traceSink := newTestSink() req := ptraceotlp.NewExportRequestFromTraces(td) - var mtx sync.Mutex - numResponses := 0 - // Send request that will acquire all of the semaphores bytes and block. - go func() { - _, err = traceClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() - }() - for i := 0; i < maxWaiters+1; i++ { + traceClient, selfExp, selfProv := makeTraceServiceClient(t, traceSink) + + var wait sync.WaitGroup + wait.Add(10) + + var expectSuccess atomic.Int32 + + for i := 0; i < 10; i++ { go func() { - _, err := traceClient.Export(bg, req) - mtx.Lock() - errs = multierr.Append(errs, err) - numResponses++ - mtx.Unlock() + defer wait.Done() + _, err := traceClient.Export(context.Background(), req) + if err == nil { + // some succeed! + expectSuccess.Add(1) + return + } + assert.EqualError(t, err, "rpc error: code = ResourceExhausted desc = rejecting request, too much pending data") }() } - // sleep so all async requests are blocked on semaphore Acquire. - time.Sleep(1 * time.Second) + traceSink.unblock() + wait.Wait() - // unblock and wait for errors to be returned and written. - bc.Unblock() - assert.Eventually(t, func() bool { - mtx.Lock() - defer mtx.Unlock() - errSlice := multierr.Errors(errs) - return numResponses == maxWaiters+2 && len(errSlice) == 1 - }, 3*time.Second, 10*time.Millisecond) + // 10 self-tracing spans are issued + require.NoError(t, selfProv.ForceFlush(context.Background())) + require.Len(t, selfExp.GetSpans(), 10) - assert.ErrorContains(t, errs, "too many waiters") + // Expect the correct number of success and failure. + testSuccess := 0 + for _, span := range selfExp.GetSpans() { + switch span.Status.Code { + case codes.Ok, codes.Unset: + testSuccess++ + } + } + require.Equal(t, int(expectSuccess.Load()), testSuccess) } -func makeTraceServiceClient(t *testing.T, tc consumer.Traces) ptraceotlp.GRPCClient { - addr := otlpReceiverOnGRPCServer(t, tc) +func makeTraceServiceClient(t *testing.T, tc consumer.Traces) (ptraceotlp.GRPCClient, *tracetest.InMemoryExporter, *trace.TracerProvider) { + addr, exp, tp := otlpReceiverOnGRPCServer(t, tc) cc, err := grpc.NewClient(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err, "Failed to create the TraceServiceClient: %v", err) t.Cleanup(func() { require.NoError(t, cc.Close()) }) - return ptraceotlp.NewGRPCClient(cc) + return ptraceotlp.NewGRPCClient(cc), exp, tp } -func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { +func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, *tracetest.InMemoryExporter, *trace.TracerProvider) { ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) @@ -142,7 +190,15 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { require.NoError(t, ln.Close()) }) + exp := tracetest.NewInMemoryExporter() + + tp := trace.NewTracerProvider(trace.WithSyncer(exp)) + telset := componenttest.NewNopTelemetrySettings() + telset.TracerProvider = tp + set := receivertest.NewNopSettings() + set.TelemetrySettings = telset + set.ID = component.NewIDWithName(component.MustNewType("otlp"), "trace") obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, @@ -150,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(noop.NewTracerProvider(), maxBytes, maxWaiters) + bq := admission.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() @@ -159,5 +215,5 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) net.Addr { _ = srv.Serve(ln) }() - return ln.Addr() + return ln.Addr(), exp, tp } diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 587c59ee448b..8d11523ea855 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -66,7 +66,7 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if err != nil { return nil, err } - bq := admission.NewBoundedQueue(set.TracerProvider, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit) + bq := admission.NewBoundedQueue(set.TelemetrySettings, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit) r := &otelArrowReceiver{ cfg: cfg, settings: set,