From 8ea5956d78eb27bbb81ead15e431302feadba902 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Thu, 14 Nov 2024 08:41:28 -0800 Subject: [PATCH] (otelarrowreceiver): LIFO-based admission control, waiting limit expressed in MiB of request data (#36141) --- .chloggen/otelarrow-admission.yaml | 27 +++ internal/otelarrow/admission/README.md | 20 -- internal/otelarrow/admission/boundedqueue.go | 164 ------------- .../otelarrow/admission/boundedqueue_test.go | 219 ------------------ internal/otelarrow/admission/controller.go | 59 ----- internal/otelarrow/go.mod | 6 +- internal/otelarrow/go.sum | 9 - internal/otelarrow/test/e2e_test.go | 7 +- receiver/otelarrowreceiver/README.md | 6 +- receiver/otelarrowreceiver/config.go | 7 +- receiver/otelarrowreceiver/config_test.go | 5 +- receiver/otelarrowreceiver/factory.go | 4 +- receiver/otelarrowreceiver/go.mod | 4 - receiver/otelarrowreceiver/go.sum | 9 - .../otelarrowreceiver/internal/arrow/arrow.go | 16 +- .../internal/arrow/arrow_test.go | 21 +- .../otelarrowreceiver/internal/logs/otlp.go | 13 +- .../internal/logs/otlp_test.go | 4 +- .../internal/metrics/otlp.go | 13 +- .../internal/metrics/otlp_test.go | 4 +- .../otelarrowreceiver/internal/trace/otlp.go | 13 +- .../internal/trace/otlp_test.go | 4 +- receiver/otelarrowreceiver/otelarrow.go | 10 +- .../otelarrowreceiver/testdata/config.yaml | 2 +- 24 files changed, 90 insertions(+), 556 deletions(-) create mode 100644 .chloggen/otelarrow-admission.yaml delete mode 100644 internal/otelarrow/admission/README.md delete mode 100644 internal/otelarrow/admission/boundedqueue.go delete mode 100644 internal/otelarrow/admission/boundedqueue_test.go delete mode 100644 internal/otelarrow/admission/controller.go diff --git a/.chloggen/otelarrow-admission.yaml b/.chloggen/otelarrow-admission.yaml new file mode 100644 index 000000000000..8bd92566d66c --- /dev/null +++ b/.chloggen/otelarrow-admission.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Admission control improvements (LIFO); admission.waiter_limit is deprecated, replaced with admission.waiting_limit_mib. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/otelarrow/admission/README.md b/internal/otelarrow/admission/README.md deleted file mode 100644 index 053ad05a8fcf..000000000000 --- a/internal/otelarrow/admission/README.md +++ /dev/null @@ -1,20 +0,0 @@ -# Admission Package - -## Overview - -The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`. - -This package is an experiment to improve the behavior of Collector pipelines having their `exporterhelper` configured to apply backpressure. This package is meant to be used in receivers, via an interceptor or custom logic. Therefore, the BoundedQueue helps limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues: -1. bytes: large requests that enter the collector pipeline can require large allocations even if downstream components will eventually limit or ratelimit the request. -2. waiters: limiting on bytes alone is not enough because requests that enter the pipeline and block on `bq.Acquire()` can still consume memory within the receiver. If there are enough waiters this can be a significant contribution to memory usage. - -## Usage - -Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiters)` - -Within the component call `bq.Acquire(ctx, requestSize)` which will either -1. succeed immediately if there is enough available memory -2. fail immediately if there are too many waiters -3. block until context cancelation or enough bytes becomes available - -Once a request has finished processing and is sent downstream call `bq.Release(requestSize)` to allow waiters to be admitted for processing. Release should only fail if releasing more bytes than previously acquired. \ No newline at end of file diff --git a/internal/otelarrow/admission/boundedqueue.go b/internal/otelarrow/admission/boundedqueue.go deleted file mode 100644 index 77a270a7d3dd..000000000000 --- a/internal/otelarrow/admission/boundedqueue.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" - -import ( - "context" - "fmt" - "sync" - - "github.com/google/uuid" - orderedmap "github.com/wk8/go-ordered-map/v2" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" - grpccodes "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -var ErrTooManyWaiters = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data") -var ErrRequestTooLarge = status.Error(grpccodes.InvalidArgument, "rejecting request, request is too large") - -type BoundedQueue struct { - maxLimitBytes int64 - maxLimitWaiters int64 - currentBytes int64 - currentWaiters int64 - lock sync.Mutex - waiters *orderedmap.OrderedMap[uuid.UUID, waiter] - tracer trace.Tracer -} - -type waiter struct { - readyCh chan struct{} - pendingBytes int64 - ID uuid.UUID -} - -func NewBoundedQueue(ts component.TelemetrySettings, maxLimitBytes, maxLimitWaiters int64) Queue { - return &BoundedQueue{ - maxLimitBytes: maxLimitBytes, - maxLimitWaiters: maxLimitWaiters, - waiters: orderedmap.New[uuid.UUID, waiter](), - tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), - } -} - -func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) { - bq.lock.Lock() - defer bq.lock.Unlock() - - if pendingBytes > bq.maxLimitBytes { // will never succeed - return false, ErrRequestTooLarge - } - - if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit - bq.currentBytes += pendingBytes - return true, nil - } - - // since we were unable to admit, check if we can wait. - if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters - return false, ErrTooManyWaiters - } - - // if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex. - bq.currentWaiters++ - return false, nil -} - -func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { - success, err := bq.admit(pendingBytes) - if err != nil || success { - return err - } - - // otherwise we need to wait for bytes to be released - curWaiter := waiter{ - pendingBytes: pendingBytes, - readyCh: make(chan struct{}), - } - - bq.lock.Lock() - - // generate unique key - for { - id := uuid.New() - _, keyExists := bq.waiters.Get(id) - if keyExists { - continue - } - bq.waiters.Set(id, curWaiter) - curWaiter.ID = id - break - } - - bq.lock.Unlock() - ctx, span := bq.tracer.Start(ctx, "admission_blocked", - trace.WithAttributes(attribute.Int64("pending", pendingBytes))) - defer span.End() - - select { - case <-curWaiter.readyCh: - return nil - case <-ctx.Done(): - // canceled before acquired so remove waiter. - bq.lock.Lock() - defer bq.lock.Unlock() - err = fmt.Errorf("context canceled: %w ", ctx.Err()) - span.SetStatus(codes.Error, "context canceled") - - _, found := bq.waiters.Delete(curWaiter.ID) - if !found { - return err - } - - bq.currentWaiters-- - return err - } -} - -func (bq *BoundedQueue) Release(pendingBytes int64) error { - bq.lock.Lock() - defer bq.lock.Unlock() - - bq.currentBytes -= pendingBytes - - if bq.currentBytes < 0 { - return fmt.Errorf("released more bytes than acquired") - } - - for { - if bq.waiters.Len() == 0 { - return nil - } - next := bq.waiters.Oldest() - nextWaiter := next.Value - nextKey := next.Key - if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes { - bq.currentBytes += nextWaiter.pendingBytes - bq.currentWaiters-- - close(nextWaiter.readyCh) - _, found := bq.waiters.Delete(nextKey) - if !found { - return fmt.Errorf("deleting waiter that doesn't exist") - } - continue - } - break - } - - return nil -} - -func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool { - bq.lock.Lock() - defer bq.lock.Unlock() - if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { - bq.currentBytes += pendingBytes - return true - } - return false -} diff --git a/internal/otelarrow/admission/boundedqueue_test.go b/internal/otelarrow/admission/boundedqueue_test.go deleted file mode 100644 index ffd616018ea2..000000000000 --- a/internal/otelarrow/admission/boundedqueue_test.go +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package admission - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.uber.org/multierr" -) - -func min(x, y int64) int64 { - if x <= y { - return x - } - return y -} - -func max(x, y int64) int64 { - if x >= y { - return x - } - return y -} - -func abs(x int64) int64 { - if x < 0 { - return -x - } - return x -} - -var noopTelemetry = componenttest.NewNopTelemetrySettings() - -func newTestQueue(limitBytes, limitWaiters int64) *BoundedQueue { - return NewBoundedQueue(noopTelemetry, limitBytes, limitWaiters).(*BoundedQueue) -} - -func TestAcquireSimpleNoWaiters(t *testing.T) { - maxLimitBytes := 1000 - maxLimitWaiters := 10 - numRequests := 40 - requestSize := 21 - - bq := newTestQueue(int64(maxLimitBytes), int64(maxLimitWaiters)) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - for i := 0; i < numRequests; i++ { - go func() { - err := bq.Acquire(ctx, int64(requestSize)) - assert.NoError(t, err) - }() - } - - require.Never(t, func() bool { - return bq.waiters.Len() > 0 - }, 2*time.Second, 10*time.Millisecond) - - for i := 0; i < numRequests; i++ { - assert.NoError(t, bq.Release(int64(requestSize))) - assert.Equal(t, int64(0), bq.currentWaiters) - } - - assert.ErrorContains(t, bq.Release(int64(1)), "released more bytes than acquired") - assert.NoError(t, bq.Acquire(ctx, int64(maxLimitBytes))) -} - -func TestAcquireBoundedWithWaiters(t *testing.T) { - tests := []struct { - name string - maxLimitBytes int64 - maxLimitWaiters int64 - numRequests int64 - requestSize int64 - timeout time.Duration - }{ - { - name: "below max waiters above max bytes", - maxLimitBytes: 1000, - maxLimitWaiters: 100, - numRequests: 100, - requestSize: 21, - timeout: 5 * time.Second, - }, - { - name: "above max waiters above max bytes", - maxLimitBytes: 1000, - maxLimitWaiters: 100, - numRequests: 200, - requestSize: 21, - timeout: 5 * time.Second, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - bq := newTestQueue(tt.maxLimitBytes, tt.maxLimitWaiters) - var blockedRequests int64 - numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize - requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked) - tooManyWaiters := requestsAboveLimit > tt.maxLimitWaiters - numRejected := max(requestsAboveLimit-tt.maxLimitWaiters, int64(0)) - - // There should never be more blocked requests than maxLimitWaiters. - blockedRequests = min(tt.maxLimitWaiters, requestsAboveLimit) - - ctx, cancel := context.WithTimeout(context.Background(), tt.timeout) - defer cancel() - var errs error - for i := 0; i < int(tt.numRequests); i++ { - go func() { - err := bq.Acquire(ctx, tt.requestSize) - bq.lock.Lock() - defer bq.lock.Unlock() - errs = multierr.Append(errs, err) - }() - } - - require.Eventually(t, func() bool { - bq.lock.Lock() - defer bq.lock.Unlock() - return bq.waiters.Len() == int(blockedRequests) - }, 3*time.Second, 10*time.Millisecond) - - assert.NoError(t, bq.Release(tt.requestSize)) - assert.Equal(t, bq.waiters.Len(), int(blockedRequests)-1) - - for i := 0; i < int(tt.numRequests-numRejected)-1; i++ { - assert.NoError(t, bq.Release(tt.requestSize)) - } - - bq.lock.Lock() - if tooManyWaiters { - assert.ErrorContains(t, errs, ErrTooManyWaiters.Error()) - } else { - assert.NoError(t, errs) - } - bq.lock.Unlock() - - // confirm all bytes were released by acquiring maxLimitBytes. - assert.True(t, bq.TryAcquire(tt.maxLimitBytes)) - }) - } -} - -func TestAcquireContextCanceled(t *testing.T) { - maxLimitBytes := 1000 - maxLimitWaiters := 100 - numRequests := 100 - requestSize := 21 - numReqsUntilBlocked := maxLimitBytes / requestSize - requestsAboveLimit := abs(int64(numRequests) - int64(numReqsUntilBlocked)) - - blockedRequests := min(int64(maxLimitWaiters), requestsAboveLimit) - - exp := tracetest.NewInMemoryExporter() - tp := trace.NewTracerProvider(trace.WithSyncer(exp)) - ts := noopTelemetry - ts.TracerProvider = tp - - bq := NewBoundedQueue(ts, int64(maxLimitBytes), int64(maxLimitWaiters)).(*BoundedQueue) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - var errs error - var wg sync.WaitGroup - for i := 0; i < numRequests; i++ { - wg.Add(1) - go func() { - err := bq.Acquire(ctx, int64(requestSize)) - bq.lock.Lock() - defer bq.lock.Unlock() - errs = multierr.Append(errs, err) - wg.Done() - }() - } - - // Wait until all calls to Acquire() happen and we have the expected number of waiters. - require.Eventually(t, func() bool { - bq.lock.Lock() - defer bq.lock.Unlock() - return bq.waiters.Len() == int(blockedRequests) - }, 3*time.Second, 10*time.Millisecond) - - cancel() - wg.Wait() - assert.ErrorContains(t, errs, "context canceled") - - // Expect spans named admission_blocked w/ context canceled. - spans := exp.GetSpans() - exp.Reset() - assert.NotEmpty(t, spans) - for _, span := range spans { - assert.Equal(t, "admission_blocked", span.Name) - assert.Equal(t, codes.Error, span.Status.Code) - assert.Equal(t, "context canceled", span.Status.Description) - } - - // Now all waiters should have returned and been removed. - assert.Equal(t, 0, bq.waiters.Len()) - - for i := 0; i < numReqsUntilBlocked; i++ { - assert.NoError(t, bq.Release(int64(requestSize))) - assert.Equal(t, int64(0), bq.currentWaiters) - } - assert.True(t, bq.TryAcquire(int64(maxLimitBytes))) - - // Expect no more spans, because admission was not blocked. - spans = exp.GetSpans() - require.Empty(t, spans) -} diff --git a/internal/otelarrow/admission/controller.go b/internal/otelarrow/admission/controller.go deleted file mode 100644 index 989c4a4467cb..000000000000 --- a/internal/otelarrow/admission/controller.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" - -import ( - "context" -) - -// Queue is a weighted admission queue interface. -type Queue interface { - // Acquire asks the controller to admit the caller. - // - // The weight parameter specifies how large of an admission to make. - // This might be used on the bytes of request (for example) to differentiate - // between large and small requests. - // - // Admit will return when one of the following events occurs: - // - // (1) admission is allowed, or - // (2) the provided ctx becomes canceled, or - // (3) there are so many existing waiters that the - // controller decides to reject this caller without - // admitting it. - // - // In case (1), the return value will be a non-nil error. The - // caller must invoke Release(weight) after it is finished - // with the resource being guarded by the admission - // controller. - // - // In case (2), the return value will be a Cancelled or - // DeadlineExceeded error. - // - // In case (3), the return value will be a ResourceExhausted - // error. - Acquire(ctx context.Context, weight int64) error - - // Release will be eliminated as part of issue #36074. - Release(weight int64) error -} - -type noopController struct{} - -var _ Queue = noopController{} - -// NewUnboundedQueue returns a no-op implementation of the Queue interface. -func NewUnboundedQueue() Queue { - return noopController{} -} - -// Acquire implements Queue. -func (noopController) Acquire(_ context.Context, _ int64) error { - return nil -} - -// Acquire implements Queue. -func (noopController) Release(_ int64) error { - return nil -} diff --git a/internal/otelarrow/go.mod b/internal/otelarrow/go.mod index 0b10b362f498..f42ea96a0fa6 100644 --- a/internal/otelarrow/go.mod +++ b/internal/otelarrow/go.mod @@ -3,13 +3,11 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelar go 1.22.0 require ( - github.com/google/uuid v1.6.0 github.com/klauspost/compress v1.17.11 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter v0.113.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver v0.113.0 github.com/open-telemetry/otel-arrow v0.30.0 github.com/stretchr/testify v1.9.0 - github.com/wk8/go-ordered-map/v2 v2.1.8 go.opentelemetry.io/collector/component v0.113.0 go.opentelemetry.io/collector/config/configgrpc v0.113.0 go.opentelemetry.io/collector/config/configtelemetry v0.113.0 @@ -35,9 +33,7 @@ require ( github.com/apache/arrow/go/v16 v16.1.0 // indirect github.com/apache/arrow/go/v17 v17.0.0 // indirect github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/brianvoe/gofakeit/v6 v6.17.0 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect @@ -50,12 +46,12 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect - github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/internal/otelarrow/go.sum b/internal/otelarrow/go.sum index 2c932716bad2..31559a8aa84a 100644 --- a/internal/otelarrow/go.sum +++ b/internal/otelarrow/go.sum @@ -9,12 +9,8 @@ github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -51,7 +47,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -73,8 +68,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -100,8 +93,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= -github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index f6b3f3fc4578..348956d2d681 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -711,12 +711,7 @@ func TestIntegrationAdmissionLimited(t *testing.T) { testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) { rcfg.Admission.RequestLimitMiB = admitLimit - - // Note: #36074 will change WaiterLimit to WaitingLimitMiB - // measured in bytes, not request count. This test is designed - // to work either way by virtue of having requests that are - // just shy of 1MiB. - rcfg.Admission.WaiterLimit = int64(waitingLimit) + rcfg.Admission.WaitingLimitMiB = waitingLimit ecfg.Arrow.NumStreams = 10 diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index b34d99c18e7f..2088caec1bb0 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -82,11 +82,11 @@ Several common configuration structures provide additional capabilities automati In the `admission` configuration block the following settings are available: -- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. When this field is set to zero, admission control is disabled. +- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib`, which limits allocations made by the consumer when translating arrow records into pdata objects. The `request_limit_mib` value is used to control how much traffic we admit, but does not control how much memory is used during request processing. -- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. +- `waiting_limit_mib` (default: 32): limits the number of requests waiting on admission after `request_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. -`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. +`request_limit_mib` and `waiting_limit_mib` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission2). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. ### Arrow-specific Configuration diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index 0f674d656b09..934f8e26bc64 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -25,10 +25,10 @@ type AdmissionConfig struct { // for processing. When this field is zero, admission control is disabled. RequestLimitMiB uint64 `mapstructure:"request_limit_mib"` - // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. + // WaitingLimitMiB is the limit on the amount of data waiting to be consumed. // This is a dimension of memory limiting to ensure waiters are not consuming an // unexpectedly large amount of memory in the arrow receiver. - WaiterLimit int64 `mapstructure:"waiter_limit"` + WaitingLimitMiB uint64 `mapstructure:"waiting_limit_mib"` } // ArrowConfig support configuring the Arrow receiver. @@ -86,8 +86,5 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error { if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } - if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 { - cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit - } return nil } diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 29a1e8a08ad5..d50edab9fac2 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -82,7 +82,7 @@ func TestUnmarshalConfig(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: 80, - WaiterLimit: 100, + WaitingLimitMiB: 100, }, }, cfg) } @@ -106,7 +106,6 @@ func TestValidateDeprecatedConfig(t *testing.T) { Admission: AdmissionConfig{ // cfg.Validate should now set these fields. RequestLimitMiB: 80, - WaiterLimit: 100, }, }, cfg) } @@ -133,7 +132,7 @@ func TestUnmarshalConfigUnix(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: defaultRequestLimitMiB, - WaiterLimit: defaultWaiterLimit, + WaitingLimitMiB: defaultWaitingLimitMiB, }, }, cfg) } diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 07ccc1bbbb1a..c440a8de925b 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -21,7 +21,7 @@ const ( defaultMemoryLimitMiB = 128 defaultRequestLimitMiB = 128 - defaultWaiterLimit = 1000 + defaultWaitingLimitMiB = 32 ) // NewFactory creates a new OTel-Arrow receiver factory. @@ -52,7 +52,7 @@ func createDefaultConfig() component.Config { }, Admission: AdmissionConfig{ RequestLimitMiB: defaultRequestLimitMiB, - WaiterLimit: defaultWaiterLimit, + WaitingLimitMiB: defaultWaitingLimitMiB, }, } } diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index 3208842382f9..b406619d0a6d 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -41,8 +41,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/apache/arrow/go/v17 v17.0.0 // indirect github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect @@ -61,7 +59,6 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect - github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -69,7 +66,6 @@ require ( github.com/mostynb/go-grpc-compression v1.2.3 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/collector/config/configcompression v1.19.0 // indirect diff --git a/receiver/otelarrowreceiver/go.sum b/receiver/otelarrowreceiver/go.sum index bcb4f1920dda..4e59aad59a0d 100644 --- a/receiver/otelarrowreceiver/go.sum +++ b/receiver/otelarrowreceiver/go.sum @@ -7,12 +7,8 @@ github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -47,7 +43,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -69,8 +64,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -96,8 +89,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= -github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index 21d3a421bcf7..6b28f52d356b 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -38,7 +38,7 @@ import ( "google.golang.org/grpc/status" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" ) @@ -76,7 +76,7 @@ type Receiver struct { newConsumer func() arrowRecord.ConsumerAPI netReporter netstats.Interface telemetryBuilder *internalmetadata.TelemetryBuilder - boundedQueue admission.Queue + boundedQueue admission2.Queue } // receiverStream holds the inFlightWG for a single stream. @@ -93,7 +93,7 @@ func New( gsettings configgrpc.ServerConfig, authServer auth.Server, newConsumer func() arrowRecord.ConsumerAPI, - bq admission.Queue, + bq admission2.Queue, netReporter netstats.Interface, ) (*Receiver, error) { tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") @@ -452,6 +452,7 @@ type inFlightData struct { numItems int // how many items uncompSize int64 // uncompressed data size == how many bytes held in the semaphore + releaser admission2.ReleaseFunc } func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { @@ -500,10 +501,8 @@ func (id *inFlightData) anyDone(ctx context.Context) { id.span.End() - if id.uncompSize != 0 { - if err := id.boundedQueue.Release(id.uncompSize); err != nil { - id.telemetry.Logger.Error("release error", zap.Error(err)) - } + if id.releaser != nil { + id.releaser() } if id.uncompSize != 0 { @@ -631,12 +630,13 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre // immediately if there are too many waiters, or will // otherwise block until timeout or enough memory becomes // available. - acquireErr := r.boundedQueue.Acquire(inflightCtx, uncompSize) + releaser, acquireErr := r.boundedQueue.Acquire(inflightCtx, uint64(uncompSize)) if acquireErr != nil { return acquireErr } flight.uncompSize = uncompSize flight.numItems = numItems + flight.releaser = releaser r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize) r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems)) diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 65d314bdb88c..57a92c7d922e 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -43,7 +43,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" @@ -51,8 +51,8 @@ import ( var noopTelemetry = componenttest.NewNopTelemetrySettings() -func defaultBQ() admission.Queue { - return admission.NewBoundedQueue(noopTelemetry, 100000, 10) +func defaultBQ() admission2.Queue { + return admission2.NewBoundedQueue(noopTelemetry, 100000, 10) } type ( @@ -362,7 +362,7 @@ func (ctc *commonTestCase) newOOMConsumer() arrowRecord.ConsumerAPI { return mock } -func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq admission.Queue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { +func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq admission2.Queue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { var authServer auth.Server var gsettings configgrpc.ServerConfig for _, gf := range opts { @@ -452,13 +452,20 @@ func TestBoundedQueueLimits(t *testing.T) { batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) - var bq admission.Queue + var bq admission2.Queue + // Note that this test exercises the case where there is or is not an + // error unrelated to pending data, thus we pass 0 in both cases as + // the WaitingLimitMiB below. + // + // There is an end-to-end test of admission control, including the + // ResourceExhausted status code we expect, in + // internal/otelarrow/test/e2e_test.go. if tt.expectErr { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) - bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10) + bq = admission2.NewBoundedQueue(noopTelemetry, uint64(sizer.TracesSize(td)-100), 0) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 10) + bq = admission2.NewBoundedQueue(noopTelemetry, uint64(tt.admitLimit), 0) } ctc.start(ctc.newRealConsumer, bq) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 46bf68f3782a..6a4c76dbaeed 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" ) const dataFormatProtobuf = "protobuf" @@ -22,13 +22,13 @@ type Receiver struct { plogotlp.UnimplementedGRPCServer nextConsumer consumer.Logs obsrecv *receiverhelper.ObsReport - boundedQueue admission.Queue + boundedQueue admission2.Queue sizer *plog.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq admission2.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog ctx = r.obsrecv.StartLogsOp(ctx) var err error - sizeBytes := int64(r.sizer.LogsSize(req.Logs())) - if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + sizeBytes := uint64(r.sizer.LogsSize(req.Logs())) + if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { err = r.nextConsumer.ConsumeLogs(ctx, ld) - // Release() is not checked, see #36074. - _ = r.boundedQueue.Release(sizeBytes) // immediate release + releaser() // immediate release } else { err = acqErr } diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index a0bbb056f24c..d270ff138bfb 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" ) @@ -206,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) (net.Addr, *tracet ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(telset, maxBytes, 0) + bq := admission2.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index 3eabffd43f02..51ffdb377fea 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" ) const dataFormatProtobuf = "protobuf" @@ -22,13 +22,13 @@ type Receiver struct { pmetricotlp.UnimplementedGRPCServer nextConsumer consumer.Metrics obsrecv *receiverhelper.ObsReport - boundedQueue admission.Queue + boundedQueue admission2.Queue sizer *pmetric.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq admission2.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p ctx = r.obsrecv.StartMetricsOp(ctx) var err error - sizeBytes := int64(r.sizer.MetricsSize(req.Metrics())) - if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + sizeBytes := uint64(r.sizer.MetricsSize(req.Metrics())) + if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { err = r.nextConsumer.ConsumeMetrics(ctx, md) - // Release() is not checked, see #36074. - _ = r.boundedQueue.Release(sizeBytes) // immediate release + releaser() // immediate release } else { err = acqErr } diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index 77a690963bac..a78378f54bd3 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" ) @@ -206,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, *tra ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(telset, maxBytes, 0) + bq := admission2.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 00744a4c31b4..d4d94b404622 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" ) const dataFormatProtobuf = "protobuf" @@ -22,13 +22,13 @@ type Receiver struct { ptraceotlp.UnimplementedGRPCServer nextConsumer consumer.Traces obsrecv *receiverhelper.ObsReport - boundedQueue admission.Queue + boundedQueue admission2.Queue sizer *ptrace.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq admission2.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt ctx = r.obsrecv.StartTracesOp(ctx) var err error - sizeBytes := int64(r.sizer.TracesSize(req.Traces())) - if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + sizeBytes := uint64(r.sizer.TracesSize(req.Traces())) + if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { err = r.nextConsumer.ConsumeTraces(ctx, td) - // Release() is not checked, see #36074. - _ = r.boundedQueue.Release(sizeBytes) // immediate release + releaser() // immediate release } else { err = acqErr } diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index ce769f5451c4..251e194dde3c 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" ) @@ -206,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, *trac ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(telset, maxBytes, 0) + bq := admission2.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 664d7a7d9d73..b21bea395e96 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow" @@ -45,7 +45,7 @@ type otelArrowReceiver struct { obsrepGRPC *receiverhelper.ObsReport netReporter *netstats.NetworkReporter - boundedQueue admission.Queue + boundedQueue admission2.Queue settings receiver.Settings } @@ -66,11 +66,11 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if err != nil { return nil, err } - var bq admission.Queue + var bq admission2.Queue if cfg.Admission.RequestLimitMiB == 0 { - bq = admission.NewUnboundedQueue() + bq = admission2.NewUnboundedQueue() } else { - bq = admission.NewBoundedQueue(set.TelemetrySettings, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit) + bq = admission2.NewBoundedQueue(set.TelemetrySettings, cfg.Admission.RequestLimitMiB<<20, cfg.Admission.WaitingLimitMiB<<20) } r := &otelArrowReceiver{ cfg: cfg, diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index e911cafdd0c5..fee89ad789e8 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -29,4 +29,4 @@ protocols: memory_limit_mib: 123 admission: request_limit_mib: 80 - waiter_limit: 100 \ No newline at end of file + waiting_limit_mib: 100