From 20bae43c7af3eb57780b96b56dc5dc0cca0b89b8 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 1 Nov 2024 08:49:16 -0700 Subject: [PATCH] (otelarrowreceiver): Use a single call to BoundedQueue.Acquire (#36082) #### Description Simplifies the admission control logic for OTAP payloads. We call Acquire() once after uncompressing the data, instead of once with compressed size and once with the difference. #### Link to tracking issue Part of #36074. #### Testing One test is replaced with logic to verify certain BoundedQueue actions. ~Note: the OTel-Arrow test suite will not pass with this PR until it merges with #36078.~ Originally developed in #36033. #### Documentation Not user-visible. --- .chloggen/otelarrow-arrow-single-acquire.yaml | 27 ++++++ receiver/otelarrowreceiver/go.mod | 2 +- .../otelarrowreceiver/internal/arrow/arrow.go | 96 +++---------------- .../internal/arrow/arrow_test.go | 80 ++++------------ 4 files changed, 62 insertions(+), 143 deletions(-) create mode 100644 .chloggen/otelarrow-arrow-single-acquire.yaml diff --git a/.chloggen/otelarrow-arrow-single-acquire.yaml b/.chloggen/otelarrow-arrow-single-acquire.yaml new file mode 100644 index 000000000000..3fbebcd526d2 --- /dev/null +++ b/.chloggen/otelarrow-arrow-single-acquire.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Simplify receiver admission control logic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index 42e6763677e8..aae77a6179a3 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -34,7 +34,6 @@ require ( go.uber.org/zap v1.27.0 golang.org/x/net v0.30.0 google.golang.org/grpc v1.67.1 - google.golang.org/protobuf v1.35.1 ) require ( @@ -90,6 +89,7 @@ require ( golang.org/x/tools v0.22.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect + google.golang.org/protobuf v1.35.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index b5fe25d72e28..13fdcd2395f8 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -8,9 +8,7 @@ import ( "errors" "fmt" "io" - "net" "runtime" - "strconv" "strings" "sync" "sync/atomic" @@ -39,7 +37,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" @@ -454,9 +451,8 @@ type inFlightData struct { // consumeAndRespond() function. refs atomic.Int32 - numAcquired int64 // how many bytes held in the semaphore - numItems int // how many items - uncompSize int64 // uncompressed data size + numItems int // how many items + uncompSize int64 // uncompressed data size == how many bytes held in the semaphore } func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { @@ -505,8 +501,8 @@ func (id *inFlightData) anyDone(ctx context.Context) { id.span.End() - if id.numAcquired != 0 { - if err := id.boundedQueue.Release(id.numAcquired); err != nil { + if id.uncompSize != 0 { + if err := id.boundedQueue.Release(id.uncompSize); err != nil { id.telemetry.Logger.Error("release error", zap.Error(err)) } } @@ -606,19 +602,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre } } - var prevAcquiredBytes int64 - uncompSizeHeaderStr, uncompSizeHeaderFound := authHdrs["otlp-pdata-size"] - if !uncompSizeHeaderFound || len(uncompSizeHeaderStr) == 0 { - // This is a compressed size so make sure to acquire the difference when request is decompressed. - prevAcquiredBytes = int64(proto.Size(req)) - } else { - var parseErr error - prevAcquiredBytes, parseErr = strconv.ParseInt(uncompSizeHeaderStr[0], 10, 64) - if parseErr != nil { - return status.Errorf(codes.Internal, "failed to convert string to request size: %v", parseErr) - } - } - var callerCancel context.CancelFunc if encodedTimeout, has := authHdrs["grpc-timeout"]; has && len(encodedTimeout) == 1 { if timeout, decodeErr := grpcutil.DecodeTimeout(encodedTimeout[0]); decodeErr != nil { @@ -638,17 +621,6 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre } } - // Use the bounded queue to memory limit based on incoming - // uncompressed request size and waiters. Acquire will fail - // immediately if there are too many waiters, or will - // otherwise block until timeout or enough memory becomes - // available. - acquireErr := r.boundedQueue.Acquire(inflightCtx, prevAcquiredBytes) - if acquireErr != nil { - return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue: %v", acquireErr) - } - flight.numAcquired = prevAcquiredBytes - data, numItems, uncompSize, consumeErr := r.consumeBatch(ac, req) if consumeErr != nil { @@ -658,19 +630,21 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre return status.Errorf(codes.Internal, "otel-arrow decode: %v", consumeErr) } + // Use the bounded queue to memory limit based on incoming + // uncompressed request size and waiters. Acquire will fail + // immediately if there are too many waiters, or will + // otherwise block until timeout or enough memory becomes + // available. + acquireErr := r.boundedQueue.Acquire(inflightCtx, uncompSize) + if acquireErr != nil { + return acquireErr + } flight.uncompSize = uncompSize flight.numItems = numItems r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize) r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems)) - numAcquired, secondAcquireErr := r.acquireAdditionalBytes(inflightCtx, prevAcquiredBytes, uncompSize, hrcv.connInfo.Addr, uncompSizeHeaderFound) - - flight.numAcquired = numAcquired - if secondAcquireErr != nil { - return status.Errorf(codes.ResourceExhausted, "otel-arrow bounded queue re-acquire: %v", secondAcquireErr) - } - // Recognize that the request is still in-flight via consumeAndRespond() flight.refs.Add(1) @@ -901,47 +875,3 @@ func (r *Receiver) consumeData(ctx context.Context, data any, flight *inFlightDa } return retErr } - -func (r *Receiver) acquireAdditionalBytes(ctx context.Context, prevAcquired, uncompSize int64, addr net.Addr, uncompSizeHeaderFound bool) (int64, error) { - diff := uncompSize - prevAcquired - - if diff == 0 { - return uncompSize, nil - } - - if uncompSizeHeaderFound { - var clientAddr string - if addr != nil { - clientAddr = addr.String() - } - // a mismatch between header set by exporter and the uncompSize just calculated. - r.telemetry.Logger.Debug("mismatch between uncompressed size in receiver and otlp-pdata-size header", - zap.String("client-address", clientAddr), - zap.Int("uncompsize", int(uncompSize)), - zap.Int("otlp-pdata-size", int(prevAcquired)), - ) - } else if diff < 0 { - // proto.Size() on compressed request was greater than pdata uncompressed size. - r.telemetry.Logger.Debug("uncompressed size is less than compressed size", - zap.Int("uncompressed", int(uncompSize)), - zap.Int("compressed", int(prevAcquired)), - ) - } - - if diff < 0 { - // If the difference is negative, release the overage. - if err := r.boundedQueue.Release(-diff); err != nil { - return 0, err - } - } else { - // Release previously acquired bytes to prevent deadlock and - // reacquire the uncompressed size we just calculated. - if err := r.boundedQueue.Release(prevAcquired); err != nil { - return 0, err - } - if err := r.boundedQueue.Acquire(ctx, uncompSize); err != nil { - return 0, err - } - } - return uncompSize, nil -} diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 6676b438dca6..dcbe0f8546d3 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -9,7 +9,6 @@ import ( "encoding/json" "fmt" "io" - "strconv" "strings" "sync" "testing" @@ -408,6 +407,10 @@ func requireExhaustedStatus(t *testing.T, err error) { requireStatus(t, codes.ResourceExhausted, err) } +func requireInvalidArgumentStatus(t *testing.T, err error) { + requireStatus(t, codes.InvalidArgument, err) +} + func requireStatus(t *testing.T, code codes.Code, err error) { require.Error(t, err) status, ok := status.FromError(err) @@ -415,55 +418,26 @@ func requireStatus(t *testing.T, code codes.Code, err error) { require.Equal(t, code, status.Code()) } -func TestBoundedQueueWithPdataHeaders(t *testing.T) { +func TestBoundedQueueLimits(t *testing.T) { var sizer ptrace.ProtoMarshaler stdTesting := otelAssert.NewStdUnitTest(t) - pdataSizeTenTraces := sizer.TracesSize(testdata.GenerateTraces(10)) - defaultBoundedQueueLimit := int64(100000) + td := testdata.GenerateTraces(10) + tdSize := int64(sizer.TracesSize(td)) + tests := []struct { - name string - numTraces int - includePdataHeader bool - pdataSize string - rejected bool + name string + admitLimit int64 + expectErr bool }{ { - name: "no header compressed greater than uncompressed", - numTraces: 10, - }, - { - name: "no header compressed less than uncompressed", - numTraces: 100, - }, - { - name: "pdata header less than uncompressedSize", - numTraces: 10, - pdataSize: strconv.Itoa(pdataSizeTenTraces / 2), - includePdataHeader: true, - }, - { - name: "pdata header equal uncompressedSize", - numTraces: 10, - pdataSize: strconv.Itoa(pdataSizeTenTraces), - includePdataHeader: true, + name: "admit request", + admitLimit: tdSize * 2, + expectErr: false, }, { - name: "pdata header greater than uncompressedSize", - numTraces: 10, - pdataSize: strconv.Itoa(pdataSizeTenTraces * 2), - includePdataHeader: true, - }, - { - name: "no header compressed accepted uncompressed rejected", - numTraces: 100, - rejected: true, - }, - { - name: "pdata header accepted uncompressed rejected", - numTraces: 100, - rejected: true, - pdataSize: strconv.Itoa(pdataSizeTenTraces), - includePdataHeader: true, + name: "reject request", + admitLimit: tdSize / 2, + expectErr: true, }, } for _, tt := range tests { @@ -471,35 +445,23 @@ func TestBoundedQueueWithPdataHeaders(t *testing.T) { tc := newHealthyTestChannel(t) ctc := newCommonTestCase(t, tc) - td := testdata.GenerateTraces(tt.numTraces) batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) - if tt.includePdataHeader { - var hpb bytes.Buffer - hpe := hpack.NewEncoder(&hpb) - err = hpe.WriteField(hpack.HeaderField{ - Name: "otlp-pdata-size", - Value: tt.pdataSize, - }) - assert.NoError(t, err) - batch.Headers = make([]byte, hpb.Len()) - copy(batch.Headers, hpb.Bytes()) - } var bq admission.Queue - if tt.rejected { + if tt.expectErr { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(noopTelemetry, defaultBoundedQueueLimit, 10) + bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 10) } ctc.start(ctc.newRealConsumer, bq) ctc.putBatch(batch, nil) - if tt.rejected { - requireExhaustedStatus(t, ctc.wait()) + if tt.expectErr { + requireInvalidArgumentStatus(t, ctc.wait()) } else { data := <-ctc.consume actualTD := data.Data.(ptrace.Traces)