From 216257d27338f9e330199be784079634e331466c Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 1 Nov 2024 12:51:19 -0700 Subject: [PATCH 1/8] (otelarrowreceiver): LIFO-based admission control, waiting limit expressed in MiB of request data --- .chloggen/otelarrow-admission.yaml | 27 +++++++++++++++++++ internal/otelarrow/test/e2e_test.go | 7 +---- receiver/otelarrowreceiver/config.go | 7 ++--- receiver/otelarrowreceiver/config_test.go | 5 ++-- receiver/otelarrowreceiver/factory.go | 4 +-- .../otelarrowreceiver/internal/arrow/arrow.go | 10 +++---- .../internal/arrow/arrow_test.go | 11 ++++++-- .../otelarrowreceiver/internal/logs/otlp.go | 7 +++-- .../internal/metrics/otlp.go | 7 +++-- .../otelarrowreceiver/internal/trace/otlp.go | 7 +++-- receiver/otelarrowreceiver/otelarrow.go | 2 +- .../otelarrowreceiver/testdata/config.yaml | 2 +- 12 files changed, 59 insertions(+), 37 deletions(-) create mode 100644 .chloggen/otelarrow-admission.yaml diff --git a/.chloggen/otelarrow-admission.yaml b/.chloggen/otelarrow-admission.yaml new file mode 100644 index 000000000000..8bd92566d66c --- /dev/null +++ b/.chloggen/otelarrow-admission.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: otelarrowreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Admission control improvements (LIFO); admission.waiter_limit is deprecated, replaced with admission.waiting_limit_mib. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36074] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/otelarrow/test/e2e_test.go b/internal/otelarrow/test/e2e_test.go index 3b2e8e288a73..33c222567fd0 100644 --- a/internal/otelarrow/test/e2e_test.go +++ b/internal/otelarrow/test/e2e_test.go @@ -714,12 +714,7 @@ func TestIntegrationAdmissionLimited(t *testing.T) { testIntegrationTraces(ctx, t, params, func(ecfg *ExpConfig, rcfg *RecvConfig) { rcfg.Admission.RequestLimitMiB = admitLimit - - // Note: #36074 will change WaiterLimit to WaitingLimitMiB - // measured in bytes, not request count. This test is designed - // to work either way by virtue of having requests that are - // just shy of 1MiB. - rcfg.Admission.WaiterLimit = int64(waitingLimit) + rcfg.Admission.WaitingLimitMiB = waitingLimit ecfg.Arrow.NumStreams = 10 diff --git a/receiver/otelarrowreceiver/config.go b/receiver/otelarrowreceiver/config.go index abf09987e83e..863efb475a06 100644 --- a/receiver/otelarrowreceiver/config.go +++ b/receiver/otelarrowreceiver/config.go @@ -25,10 +25,10 @@ type AdmissionConfig struct { // for processing. When this field is zero, admission control is disabled. RequestLimitMiB uint64 `mapstructure:"request_limit_mib"` - // WaiterLimit is the limit on the number of waiters waiting to be processed and consumed. + // WaitingLimitMiB is the limit on the amount of data waiting to be consumed. // This is a dimension of memory limiting to ensure waiters are not consuming an // unexpectedly large amount of memory in the arrow receiver. - WaiterLimit int64 `mapstructure:"waiter_limit"` + WaitingLimitMiB uint64 `mapstructure:"waiting_limit_mib"` } // ArrowConfig support configuring the Arrow receiver. @@ -84,8 +84,5 @@ func (cfg *Config) Unmarshal(conf *confmap.Conf) error { if cfg.Admission.RequestLimitMiB == 0 && cfg.Arrow.DeprecatedAdmissionLimitMiB != 0 { cfg.Admission.RequestLimitMiB = cfg.Arrow.DeprecatedAdmissionLimitMiB } - if cfg.Admission.WaiterLimit == 0 && cfg.Arrow.DeprecatedWaiterLimit != 0 { - cfg.Admission.WaiterLimit = cfg.Arrow.DeprecatedWaiterLimit - } return nil } diff --git a/receiver/otelarrowreceiver/config_test.go b/receiver/otelarrowreceiver/config_test.go index 60edaf00cf61..1db7a1f32430 100644 --- a/receiver/otelarrowreceiver/config_test.go +++ b/receiver/otelarrowreceiver/config_test.go @@ -82,7 +82,7 @@ func TestUnmarshalConfig(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: 80, - WaiterLimit: 100, + WaitingLimitMiB: 100, }, }, cfg) @@ -107,7 +107,6 @@ func TestValidateDeprecatedConfig(t *testing.T) { Admission: AdmissionConfig{ // cfg.Validate should now set these fields. RequestLimitMiB: 80, - WaiterLimit: 100, }, }, cfg) } @@ -134,7 +133,7 @@ func TestUnmarshalConfigUnix(t *testing.T) { }, Admission: AdmissionConfig{ RequestLimitMiB: defaultRequestLimitMiB, - WaiterLimit: defaultWaiterLimit, + WaitingLimitMiB: defaultWaitingLimitMiB, }, }, cfg) } diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 07ccc1bbbb1a..99e483ce6888 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -21,7 +21,7 @@ const ( defaultMemoryLimitMiB = 128 defaultRequestLimitMiB = 128 - defaultWaiterLimit = 1000 + defaultWaitingLimitMiB = 1000 ) // NewFactory creates a new OTel-Arrow receiver factory. @@ -52,7 +52,7 @@ func createDefaultConfig() component.Config { }, Admission: AdmissionConfig{ RequestLimitMiB: defaultRequestLimitMiB, - WaiterLimit: defaultWaiterLimit, + WaitingLimitMiB: defaultWaitingLimitMiB, }, } } diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index 13fdcd2395f8..3a84fee51539 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -453,6 +453,7 @@ type inFlightData struct { numItems int // how many items uncompSize int64 // uncompressed data size == how many bytes held in the semaphore + releaser admission.ReleaseFunc } func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { @@ -501,10 +502,8 @@ func (id *inFlightData) anyDone(ctx context.Context) { id.span.End() - if id.uncompSize != 0 { - if err := id.boundedQueue.Release(id.uncompSize); err != nil { - id.telemetry.Logger.Error("release error", zap.Error(err)) - } + if id.releaser != nil { + id.releaser() } if id.uncompSize != 0 { @@ -635,12 +634,13 @@ func (r *receiverStream) recvOne(streamCtx context.Context, serverStream anyStre // immediately if there are too many waiters, or will // otherwise block until timeout or enough memory becomes // available. - acquireErr := r.boundedQueue.Acquire(inflightCtx, uncompSize) + releaser, acquireErr := r.boundedQueue.Acquire(inflightCtx, uint64(uncompSize)) if acquireErr != nil { return acquireErr } flight.uncompSize = uncompSize flight.numItems = numItems + flight.releaser = releaser r.telemetryBuilder.OtelArrowReceiverInFlightBytes.Add(inflightCtx, uncompSize) r.telemetryBuilder.OtelArrowReceiverInFlightItems.Add(inflightCtx, int64(numItems)) diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index dcbe0f8546d3..b5d2c2d31f17 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -449,12 +449,19 @@ func TestBoundedQueueLimits(t *testing.T) { require.NoError(t, err) var bq admission.Queue + // Note that this test exercises the case where there is or is not an + // error unrelated to pending data, thus we pass 0 in both cases as + // the WaitingLimitMiB below. + // + // There is an end-to-end test of admission control, including the + // ResourceExhausted status code we expect, in + // internal/otelarrow/test/e2e_test.go. if tt.expectErr { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) - bq = admission.NewBoundedQueue(noopTelemetry, int64(sizer.TracesSize(td)-100), 10) + bq = admission.NewBoundedQueue(noopTelemetry, uint64(sizer.TracesSize(td)-100), 0) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(noopTelemetry, tt.admitLimit, 10) + bq = admission.NewBoundedQueue(noopTelemetry, uint64(tt.admitLimit), 0) } ctc.start(ctc.newRealConsumer, bq) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 46bf68f3782a..6ca418019fae 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req plogotlp.ExportRequest) (plog ctx = r.obsrecv.StartLogsOp(ctx) var err error - sizeBytes := int64(r.sizer.LogsSize(req.Logs())) - if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + sizeBytes := uint64(r.sizer.LogsSize(req.Logs())) + if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { err = r.nextConsumer.ConsumeLogs(ctx, ld) - // Release() is not checked, see #36074. - _ = r.boundedQueue.Release(sizeBytes) // immediate release + releaser() // immediate release } else { err = acqErr } diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index 3eabffd43f02..7b11134c2031 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req pmetricotlp.ExportRequest) (p ctx = r.obsrecv.StartMetricsOp(ctx) var err error - sizeBytes := int64(r.sizer.MetricsSize(req.Metrics())) - if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + sizeBytes := uint64(r.sizer.MetricsSize(req.Metrics())) + if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { err = r.nextConsumer.ConsumeMetrics(ctx, md) - // Release() is not checked, see #36074. - _ = r.boundedQueue.Release(sizeBytes) // immediate release + releaser() // immediate release } else { err = acqErr } diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index 00744a4c31b4..af066b2682e8 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -49,11 +49,10 @@ func (r *Receiver) Export(ctx context.Context, req ptraceotlp.ExportRequest) (pt ctx = r.obsrecv.StartTracesOp(ctx) var err error - sizeBytes := int64(r.sizer.TracesSize(req.Traces())) - if acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { + sizeBytes := uint64(r.sizer.TracesSize(req.Traces())) + if releaser, acqErr := r.boundedQueue.Acquire(ctx, sizeBytes); acqErr == nil { err = r.nextConsumer.ConsumeTraces(ctx, td) - // Release() is not checked, see #36074. - _ = r.boundedQueue.Release(sizeBytes) // immediate release + releaser() // immediate release } else { err = acqErr } diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index ef5ce6f7f9a0..62ef6ae803c0 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -70,7 +70,7 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if cfg.Admission.RequestLimitMiB == 0 { bq = admission.NewUnboundedQueue() } else { - bq = admission.NewBoundedQueue(set.TelemetrySettings, int64(cfg.Admission.RequestLimitMiB<<20), cfg.Admission.WaiterLimit) + bq = admission.NewBoundedQueue(set.TelemetrySettings, uint64(cfg.Admission.RequestLimitMiB<<20), uint64(cfg.Admission.WaitingLimitMiB<<20)) } r := &otelArrowReceiver{ cfg: cfg, diff --git a/receiver/otelarrowreceiver/testdata/config.yaml b/receiver/otelarrowreceiver/testdata/config.yaml index e911cafdd0c5..fee89ad789e8 100644 --- a/receiver/otelarrowreceiver/testdata/config.yaml +++ b/receiver/otelarrowreceiver/testdata/config.yaml @@ -29,4 +29,4 @@ protocols: memory_limit_mib: 123 admission: request_limit_mib: 80 - waiter_limit: 100 \ No newline at end of file + waiting_limit_mib: 100 From 182f1eb917253fcd1c5cbfb8a642c7321e122e70 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Fri, 1 Nov 2024 12:53:23 -0700 Subject: [PATCH 2/8] README --- receiver/otelarrowreceiver/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index 0903695852de..e43742fbd0ac 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -82,11 +82,11 @@ Several common configuration structures provide additional capabilities automati In the `admission` configuration block the following settings are available: -- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib` which limits allocations made by the consumer when translating arrow records into pdata objects. i.e. request size is used to control how much traffic we admit, but does not control how much memory is used during request processing. When this field is set to zero, admission control is disabled. +- `request_limit_mib` (default: 128): limits the number of requests that are received by the stream in terms of *uncompressed request size*. This should not be confused with `arrow.memory_limit_mib`, which limits allocations made by the consumer when translating arrow records into pdata objects. The `request_limit_mib` value is used to control how much traffic we admit, but does not control how much memory is used during request processing. -- `waiter_limit` (default: 1000): limits the number of requests waiting on admission once `admission_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. +- `waiting_limit_mib` (default: 32): limits the number of requests waiting on admission after `request_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. -`request_limit_mib` and `waiter_limit` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. +`request_limit_mib` and `waiting_limit_mib` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. ### Arrow-specific Configuration From 3d883170f286b690bbcc94ffca5f291bf33c6f39 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 12 Nov 2024 09:00:16 -0800 Subject: [PATCH 3/8] remove old --- internal/otelarrow/admission/README.md | 20 -- internal/otelarrow/admission/boundedqueue.go | 164 ------------- .../otelarrow/admission/boundedqueue_test.go | 219 ------------------ internal/otelarrow/admission/controller.go | 59 ----- 4 files changed, 462 deletions(-) delete mode 100644 internal/otelarrow/admission/README.md delete mode 100644 internal/otelarrow/admission/boundedqueue.go delete mode 100644 internal/otelarrow/admission/boundedqueue_test.go delete mode 100644 internal/otelarrow/admission/controller.go diff --git a/internal/otelarrow/admission/README.md b/internal/otelarrow/admission/README.md deleted file mode 100644 index 053ad05a8fcf..000000000000 --- a/internal/otelarrow/admission/README.md +++ /dev/null @@ -1,20 +0,0 @@ -# Admission Package - -## Overview - -The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`. - -This package is an experiment to improve the behavior of Collector pipelines having their `exporterhelper` configured to apply backpressure. This package is meant to be used in receivers, via an interceptor or custom logic. Therefore, the BoundedQueue helps limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues: -1. bytes: large requests that enter the collector pipeline can require large allocations even if downstream components will eventually limit or ratelimit the request. -2. waiters: limiting on bytes alone is not enough because requests that enter the pipeline and block on `bq.Acquire()` can still consume memory within the receiver. If there are enough waiters this can be a significant contribution to memory usage. - -## Usage - -Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiters)` - -Within the component call `bq.Acquire(ctx, requestSize)` which will either -1. succeed immediately if there is enough available memory -2. fail immediately if there are too many waiters -3. block until context cancelation or enough bytes becomes available - -Once a request has finished processing and is sent downstream call `bq.Release(requestSize)` to allow waiters to be admitted for processing. Release should only fail if releasing more bytes than previously acquired. \ No newline at end of file diff --git a/internal/otelarrow/admission/boundedqueue.go b/internal/otelarrow/admission/boundedqueue.go deleted file mode 100644 index 77a270a7d3dd..000000000000 --- a/internal/otelarrow/admission/boundedqueue.go +++ /dev/null @@ -1,164 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" - -import ( - "context" - "fmt" - "sync" - - "github.com/google/uuid" - orderedmap "github.com/wk8/go-ordered-map/v2" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" - grpccodes "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" -) - -var ErrTooManyWaiters = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data") -var ErrRequestTooLarge = status.Error(grpccodes.InvalidArgument, "rejecting request, request is too large") - -type BoundedQueue struct { - maxLimitBytes int64 - maxLimitWaiters int64 - currentBytes int64 - currentWaiters int64 - lock sync.Mutex - waiters *orderedmap.OrderedMap[uuid.UUID, waiter] - tracer trace.Tracer -} - -type waiter struct { - readyCh chan struct{} - pendingBytes int64 - ID uuid.UUID -} - -func NewBoundedQueue(ts component.TelemetrySettings, maxLimitBytes, maxLimitWaiters int64) Queue { - return &BoundedQueue{ - maxLimitBytes: maxLimitBytes, - maxLimitWaiters: maxLimitWaiters, - waiters: orderedmap.New[uuid.UUID, waiter](), - tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"), - } -} - -func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) { - bq.lock.Lock() - defer bq.lock.Unlock() - - if pendingBytes > bq.maxLimitBytes { // will never succeed - return false, ErrRequestTooLarge - } - - if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit - bq.currentBytes += pendingBytes - return true, nil - } - - // since we were unable to admit, check if we can wait. - if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters - return false, ErrTooManyWaiters - } - - // if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex. - bq.currentWaiters++ - return false, nil -} - -func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error { - success, err := bq.admit(pendingBytes) - if err != nil || success { - return err - } - - // otherwise we need to wait for bytes to be released - curWaiter := waiter{ - pendingBytes: pendingBytes, - readyCh: make(chan struct{}), - } - - bq.lock.Lock() - - // generate unique key - for { - id := uuid.New() - _, keyExists := bq.waiters.Get(id) - if keyExists { - continue - } - bq.waiters.Set(id, curWaiter) - curWaiter.ID = id - break - } - - bq.lock.Unlock() - ctx, span := bq.tracer.Start(ctx, "admission_blocked", - trace.WithAttributes(attribute.Int64("pending", pendingBytes))) - defer span.End() - - select { - case <-curWaiter.readyCh: - return nil - case <-ctx.Done(): - // canceled before acquired so remove waiter. - bq.lock.Lock() - defer bq.lock.Unlock() - err = fmt.Errorf("context canceled: %w ", ctx.Err()) - span.SetStatus(codes.Error, "context canceled") - - _, found := bq.waiters.Delete(curWaiter.ID) - if !found { - return err - } - - bq.currentWaiters-- - return err - } -} - -func (bq *BoundedQueue) Release(pendingBytes int64) error { - bq.lock.Lock() - defer bq.lock.Unlock() - - bq.currentBytes -= pendingBytes - - if bq.currentBytes < 0 { - return fmt.Errorf("released more bytes than acquired") - } - - for { - if bq.waiters.Len() == 0 { - return nil - } - next := bq.waiters.Oldest() - nextWaiter := next.Value - nextKey := next.Key - if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes { - bq.currentBytes += nextWaiter.pendingBytes - bq.currentWaiters-- - close(nextWaiter.readyCh) - _, found := bq.waiters.Delete(nextKey) - if !found { - return fmt.Errorf("deleting waiter that doesn't exist") - } - continue - } - break - } - - return nil -} - -func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool { - bq.lock.Lock() - defer bq.lock.Unlock() - if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { - bq.currentBytes += pendingBytes - return true - } - return false -} diff --git a/internal/otelarrow/admission/boundedqueue_test.go b/internal/otelarrow/admission/boundedqueue_test.go deleted file mode 100644 index ffd616018ea2..000000000000 --- a/internal/otelarrow/admission/boundedqueue_test.go +++ /dev/null @@ -1,219 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package admission - -import ( - "context" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.uber.org/multierr" -) - -func min(x, y int64) int64 { - if x <= y { - return x - } - return y -} - -func max(x, y int64) int64 { - if x >= y { - return x - } - return y -} - -func abs(x int64) int64 { - if x < 0 { - return -x - } - return x -} - -var noopTelemetry = componenttest.NewNopTelemetrySettings() - -func newTestQueue(limitBytes, limitWaiters int64) *BoundedQueue { - return NewBoundedQueue(noopTelemetry, limitBytes, limitWaiters).(*BoundedQueue) -} - -func TestAcquireSimpleNoWaiters(t *testing.T) { - maxLimitBytes := 1000 - maxLimitWaiters := 10 - numRequests := 40 - requestSize := 21 - - bq := newTestQueue(int64(maxLimitBytes), int64(maxLimitWaiters)) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - for i := 0; i < numRequests; i++ { - go func() { - err := bq.Acquire(ctx, int64(requestSize)) - assert.NoError(t, err) - }() - } - - require.Never(t, func() bool { - return bq.waiters.Len() > 0 - }, 2*time.Second, 10*time.Millisecond) - - for i := 0; i < numRequests; i++ { - assert.NoError(t, bq.Release(int64(requestSize))) - assert.Equal(t, int64(0), bq.currentWaiters) - } - - assert.ErrorContains(t, bq.Release(int64(1)), "released more bytes than acquired") - assert.NoError(t, bq.Acquire(ctx, int64(maxLimitBytes))) -} - -func TestAcquireBoundedWithWaiters(t *testing.T) { - tests := []struct { - name string - maxLimitBytes int64 - maxLimitWaiters int64 - numRequests int64 - requestSize int64 - timeout time.Duration - }{ - { - name: "below max waiters above max bytes", - maxLimitBytes: 1000, - maxLimitWaiters: 100, - numRequests: 100, - requestSize: 21, - timeout: 5 * time.Second, - }, - { - name: "above max waiters above max bytes", - maxLimitBytes: 1000, - maxLimitWaiters: 100, - numRequests: 200, - requestSize: 21, - timeout: 5 * time.Second, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - bq := newTestQueue(tt.maxLimitBytes, tt.maxLimitWaiters) - var blockedRequests int64 - numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize - requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked) - tooManyWaiters := requestsAboveLimit > tt.maxLimitWaiters - numRejected := max(requestsAboveLimit-tt.maxLimitWaiters, int64(0)) - - // There should never be more blocked requests than maxLimitWaiters. - blockedRequests = min(tt.maxLimitWaiters, requestsAboveLimit) - - ctx, cancel := context.WithTimeout(context.Background(), tt.timeout) - defer cancel() - var errs error - for i := 0; i < int(tt.numRequests); i++ { - go func() { - err := bq.Acquire(ctx, tt.requestSize) - bq.lock.Lock() - defer bq.lock.Unlock() - errs = multierr.Append(errs, err) - }() - } - - require.Eventually(t, func() bool { - bq.lock.Lock() - defer bq.lock.Unlock() - return bq.waiters.Len() == int(blockedRequests) - }, 3*time.Second, 10*time.Millisecond) - - assert.NoError(t, bq.Release(tt.requestSize)) - assert.Equal(t, bq.waiters.Len(), int(blockedRequests)-1) - - for i := 0; i < int(tt.numRequests-numRejected)-1; i++ { - assert.NoError(t, bq.Release(tt.requestSize)) - } - - bq.lock.Lock() - if tooManyWaiters { - assert.ErrorContains(t, errs, ErrTooManyWaiters.Error()) - } else { - assert.NoError(t, errs) - } - bq.lock.Unlock() - - // confirm all bytes were released by acquiring maxLimitBytes. - assert.True(t, bq.TryAcquire(tt.maxLimitBytes)) - }) - } -} - -func TestAcquireContextCanceled(t *testing.T) { - maxLimitBytes := 1000 - maxLimitWaiters := 100 - numRequests := 100 - requestSize := 21 - numReqsUntilBlocked := maxLimitBytes / requestSize - requestsAboveLimit := abs(int64(numRequests) - int64(numReqsUntilBlocked)) - - blockedRequests := min(int64(maxLimitWaiters), requestsAboveLimit) - - exp := tracetest.NewInMemoryExporter() - tp := trace.NewTracerProvider(trace.WithSyncer(exp)) - ts := noopTelemetry - ts.TracerProvider = tp - - bq := NewBoundedQueue(ts, int64(maxLimitBytes), int64(maxLimitWaiters)).(*BoundedQueue) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - var errs error - var wg sync.WaitGroup - for i := 0; i < numRequests; i++ { - wg.Add(1) - go func() { - err := bq.Acquire(ctx, int64(requestSize)) - bq.lock.Lock() - defer bq.lock.Unlock() - errs = multierr.Append(errs, err) - wg.Done() - }() - } - - // Wait until all calls to Acquire() happen and we have the expected number of waiters. - require.Eventually(t, func() bool { - bq.lock.Lock() - defer bq.lock.Unlock() - return bq.waiters.Len() == int(blockedRequests) - }, 3*time.Second, 10*time.Millisecond) - - cancel() - wg.Wait() - assert.ErrorContains(t, errs, "context canceled") - - // Expect spans named admission_blocked w/ context canceled. - spans := exp.GetSpans() - exp.Reset() - assert.NotEmpty(t, spans) - for _, span := range spans { - assert.Equal(t, "admission_blocked", span.Name) - assert.Equal(t, codes.Error, span.Status.Code) - assert.Equal(t, "context canceled", span.Status.Description) - } - - // Now all waiters should have returned and been removed. - assert.Equal(t, 0, bq.waiters.Len()) - - for i := 0; i < numReqsUntilBlocked; i++ { - assert.NoError(t, bq.Release(int64(requestSize))) - assert.Equal(t, int64(0), bq.currentWaiters) - } - assert.True(t, bq.TryAcquire(int64(maxLimitBytes))) - - // Expect no more spans, because admission was not blocked. - spans = exp.GetSpans() - require.Empty(t, spans) -} diff --git a/internal/otelarrow/admission/controller.go b/internal/otelarrow/admission/controller.go deleted file mode 100644 index 989c4a4467cb..000000000000 --- a/internal/otelarrow/admission/controller.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" - -import ( - "context" -) - -// Queue is a weighted admission queue interface. -type Queue interface { - // Acquire asks the controller to admit the caller. - // - // The weight parameter specifies how large of an admission to make. - // This might be used on the bytes of request (for example) to differentiate - // between large and small requests. - // - // Admit will return when one of the following events occurs: - // - // (1) admission is allowed, or - // (2) the provided ctx becomes canceled, or - // (3) there are so many existing waiters that the - // controller decides to reject this caller without - // admitting it. - // - // In case (1), the return value will be a non-nil error. The - // caller must invoke Release(weight) after it is finished - // with the resource being guarded by the admission - // controller. - // - // In case (2), the return value will be a Cancelled or - // DeadlineExceeded error. - // - // In case (3), the return value will be a ResourceExhausted - // error. - Acquire(ctx context.Context, weight int64) error - - // Release will be eliminated as part of issue #36074. - Release(weight int64) error -} - -type noopController struct{} - -var _ Queue = noopController{} - -// NewUnboundedQueue returns a no-op implementation of the Queue interface. -func NewUnboundedQueue() Queue { - return noopController{} -} - -// Acquire implements Queue. -func (noopController) Acquire(_ context.Context, _ int64) error { - return nil -} - -// Acquire implements Queue. -func (noopController) Release(_ int64) error { - return nil -} From 4dd869cc0a74c7e68992c760c43399e3e53d4ebe Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 12 Nov 2024 09:00:26 -0800 Subject: [PATCH 4/8] rename new --- internal/otelarrow/{admission2 => admission}/README.md | 0 internal/otelarrow/{admission2 => admission}/boundedqueue.go | 0 internal/otelarrow/{admission2 => admission}/boundedqueue_test.go | 0 internal/otelarrow/{admission2 => admission}/controller.go | 0 internal/otelarrow/{admission2 => admission}/notification.go | 0 internal/otelarrow/{admission2 => admission}/notification_test.go | 0 6 files changed, 0 insertions(+), 0 deletions(-) rename internal/otelarrow/{admission2 => admission}/README.md (100%) rename internal/otelarrow/{admission2 => admission}/boundedqueue.go (100%) rename internal/otelarrow/{admission2 => admission}/boundedqueue_test.go (100%) rename internal/otelarrow/{admission2 => admission}/controller.go (100%) rename internal/otelarrow/{admission2 => admission}/notification.go (100%) rename internal/otelarrow/{admission2 => admission}/notification_test.go (100%) diff --git a/internal/otelarrow/admission2/README.md b/internal/otelarrow/admission/README.md similarity index 100% rename from internal/otelarrow/admission2/README.md rename to internal/otelarrow/admission/README.md diff --git a/internal/otelarrow/admission2/boundedqueue.go b/internal/otelarrow/admission/boundedqueue.go similarity index 100% rename from internal/otelarrow/admission2/boundedqueue.go rename to internal/otelarrow/admission/boundedqueue.go diff --git a/internal/otelarrow/admission2/boundedqueue_test.go b/internal/otelarrow/admission/boundedqueue_test.go similarity index 100% rename from internal/otelarrow/admission2/boundedqueue_test.go rename to internal/otelarrow/admission/boundedqueue_test.go diff --git a/internal/otelarrow/admission2/controller.go b/internal/otelarrow/admission/controller.go similarity index 100% rename from internal/otelarrow/admission2/controller.go rename to internal/otelarrow/admission/controller.go diff --git a/internal/otelarrow/admission2/notification.go b/internal/otelarrow/admission/notification.go similarity index 100% rename from internal/otelarrow/admission2/notification.go rename to internal/otelarrow/admission/notification.go diff --git a/internal/otelarrow/admission2/notification_test.go b/internal/otelarrow/admission/notification_test.go similarity index 100% rename from internal/otelarrow/admission2/notification_test.go rename to internal/otelarrow/admission/notification_test.go From e81145d926b8727de9a9df979457f63cfedb7e0a Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 12 Nov 2024 09:09:11 -0800 Subject: [PATCH 5/8] lint --- internal/otelarrow/admission/boundedqueue.go | 2 +- internal/otelarrow/admission/boundedqueue_test.go | 2 +- internal/otelarrow/admission/controller.go | 2 +- internal/otelarrow/admission/notification.go | 2 +- internal/otelarrow/admission/notification_test.go | 2 +- receiver/otelarrowreceiver/otelarrow.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/otelarrow/admission/boundedqueue.go b/internal/otelarrow/admission/boundedqueue.go index e0cd95e3bb2c..9ee9ddfb8735 100644 --- a/internal/otelarrow/admission/boundedqueue.go +++ b/internal/otelarrow/admission/boundedqueue.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" +package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" import ( "container/list" diff --git a/internal/otelarrow/admission/boundedqueue_test.go b/internal/otelarrow/admission/boundedqueue_test.go index 8903e8b90773..5812d07724d4 100644 --- a/internal/otelarrow/admission/boundedqueue_test.go +++ b/internal/otelarrow/admission/boundedqueue_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission2 +package admission import ( "context" diff --git a/internal/otelarrow/admission/controller.go b/internal/otelarrow/admission/controller.go index 6999a10838e9..0970834811a9 100644 --- a/internal/otelarrow/admission/controller.go +++ b/internal/otelarrow/admission/controller.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" +package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" import ( "context" diff --git a/internal/otelarrow/admission/notification.go b/internal/otelarrow/admission/notification.go index efbf66143548..bf8cd9b2884b 100644 --- a/internal/otelarrow/admission/notification.go +++ b/internal/otelarrow/admission/notification.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" +package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" // notification.N is a minimal Go version of absl::Notification: // diff --git a/internal/otelarrow/admission/notification_test.go b/internal/otelarrow/admission/notification_test.go index 1e66d8445a41..90fe61defb06 100644 --- a/internal/otelarrow/admission/notification_test.go +++ b/internal/otelarrow/admission/notification_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission2 +package admission import ( "testing" diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 62ef6ae803c0..5af11b232ca9 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -70,7 +70,7 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if cfg.Admission.RequestLimitMiB == 0 { bq = admission.NewUnboundedQueue() } else { - bq = admission.NewBoundedQueue(set.TelemetrySettings, uint64(cfg.Admission.RequestLimitMiB<<20), uint64(cfg.Admission.WaitingLimitMiB<<20)) + bq = admission.NewBoundedQueue(set.TelemetrySettings, cfg.Admission.RequestLimitMiB<<20, cfg.Admission.WaitingLimitMiB<<20) } r := &otelArrowReceiver{ cfg: cfg, From 041e522b6f321dec802ac15486023b2b1151e7c7 Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 12 Nov 2024 09:19:52 -0800 Subject: [PATCH 6/8] keep the name admission2 --- .../otelarrow/{admission => admission2}/README.md | 0 .../{admission => admission2}/boundedqueue.go | 2 +- .../{admission => admission2}/boundedqueue_test.go | 2 +- .../{admission => admission2}/controller.go | 2 +- .../{admission => admission2}/notification.go | 2 +- .../{admission => admission2}/notification_test.go | 2 +- receiver/otelarrowreceiver/README.md | 2 +- receiver/otelarrowreceiver/internal/arrow/arrow.go | 8 ++++---- .../otelarrowreceiver/internal/arrow/arrow_test.go | 14 +++++++------- receiver/otelarrowreceiver/internal/logs/otlp.go | 6 +++--- .../otelarrowreceiver/internal/logs/otlp_test.go | 4 ++-- .../otelarrowreceiver/internal/metrics/otlp.go | 6 +++--- .../internal/metrics/otlp_test.go | 4 ++-- receiver/otelarrowreceiver/internal/trace/otlp.go | 6 +++--- .../otelarrowreceiver/internal/trace/otlp_test.go | 4 ++-- receiver/otelarrowreceiver/otelarrow.go | 10 +++++----- 16 files changed, 37 insertions(+), 37 deletions(-) rename internal/otelarrow/{admission => admission2}/README.md (100%) rename internal/otelarrow/{admission => admission2}/boundedqueue.go (97%) rename internal/otelarrow/{admission => admission2}/boundedqueue_test.go (99%) rename internal/otelarrow/{admission => admission2}/controller.go (93%) rename internal/otelarrow/{admission => admission2}/notification.go (87%) rename internal/otelarrow/{admission => admission2}/notification_test.go (97%) diff --git a/internal/otelarrow/admission/README.md b/internal/otelarrow/admission2/README.md similarity index 100% rename from internal/otelarrow/admission/README.md rename to internal/otelarrow/admission2/README.md diff --git a/internal/otelarrow/admission/boundedqueue.go b/internal/otelarrow/admission2/boundedqueue.go similarity index 97% rename from internal/otelarrow/admission/boundedqueue.go rename to internal/otelarrow/admission2/boundedqueue.go index 9ee9ddfb8735..e0cd95e3bb2c 100644 --- a/internal/otelarrow/admission/boundedqueue.go +++ b/internal/otelarrow/admission2/boundedqueue.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" +package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" import ( "container/list" diff --git a/internal/otelarrow/admission/boundedqueue_test.go b/internal/otelarrow/admission2/boundedqueue_test.go similarity index 99% rename from internal/otelarrow/admission/boundedqueue_test.go rename to internal/otelarrow/admission2/boundedqueue_test.go index 5812d07724d4..8903e8b90773 100644 --- a/internal/otelarrow/admission/boundedqueue_test.go +++ b/internal/otelarrow/admission2/boundedqueue_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission +package admission2 import ( "context" diff --git a/internal/otelarrow/admission/controller.go b/internal/otelarrow/admission2/controller.go similarity index 93% rename from internal/otelarrow/admission/controller.go rename to internal/otelarrow/admission2/controller.go index 0970834811a9..6999a10838e9 100644 --- a/internal/otelarrow/admission/controller.go +++ b/internal/otelarrow/admission2/controller.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" +package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" import ( "context" diff --git a/internal/otelarrow/admission/notification.go b/internal/otelarrow/admission2/notification.go similarity index 87% rename from internal/otelarrow/admission/notification.go rename to internal/otelarrow/admission2/notification.go index bf8cd9b2884b..efbf66143548 100644 --- a/internal/otelarrow/admission/notification.go +++ b/internal/otelarrow/admission2/notification.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" +package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" // notification.N is a minimal Go version of absl::Notification: // diff --git a/internal/otelarrow/admission/notification_test.go b/internal/otelarrow/admission2/notification_test.go similarity index 97% rename from internal/otelarrow/admission/notification_test.go rename to internal/otelarrow/admission2/notification_test.go index 90fe61defb06..1e66d8445a41 100644 --- a/internal/otelarrow/admission/notification_test.go +++ b/internal/otelarrow/admission2/notification_test.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package admission +package admission2 import ( "testing" diff --git a/receiver/otelarrowreceiver/README.md b/receiver/otelarrowreceiver/README.md index 4fc505c545c2..2088caec1bb0 100644 --- a/receiver/otelarrowreceiver/README.md +++ b/receiver/otelarrowreceiver/README.md @@ -86,7 +86,7 @@ In the `admission` configuration block the following settings are available: - `waiting_limit_mib` (default: 32): limits the number of requests waiting on admission after `request_limit_mib` is reached. This is another dimension of memory limiting that ensures waiters are not holding onto a significant amount of memory while waiting to be processed. -`request_limit_mib` and `waiting_limit_mib` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. +`request_limit_mib` and `waiting_limit_mib` are arguments supplied to [admission.BoundedQueue](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/internal/otelarrow/admission2). This custom semaphore is meant to be used within receivers to help limit memory within the collector pipeline. ### Arrow-specific Configuration diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow.go b/receiver/otelarrowreceiver/internal/arrow/arrow.go index a56bb72ced88..6b28f52d356b 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow.go @@ -38,7 +38,7 @@ import ( "google.golang.org/grpc/status" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/grpcutil" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" internalmetadata "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/metadata" ) @@ -76,7 +76,7 @@ type Receiver struct { newConsumer func() arrowRecord.ConsumerAPI netReporter netstats.Interface telemetryBuilder *internalmetadata.TelemetryBuilder - boundedQueue admission.Queue + boundedQueue admission2.Queue } // receiverStream holds the inFlightWG for a single stream. @@ -93,7 +93,7 @@ func New( gsettings configgrpc.ServerConfig, authServer auth.Server, newConsumer func() arrowRecord.ConsumerAPI, - bq admission.Queue, + bq admission2.Queue, netReporter netstats.Interface, ) (*Receiver, error) { tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver") @@ -452,7 +452,7 @@ type inFlightData struct { numItems int // how many items uncompSize int64 // uncompressed data size == how many bytes held in the semaphore - releaser admission.ReleaseFunc + releaser admission2.ReleaseFunc } func (id *inFlightData) recvDone(ctx context.Context, recvErrPtr *error) { diff --git a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go index 57f9d9cdaa77..2d485700c961 100644 --- a/receiver/otelarrowreceiver/internal/arrow/arrow_test.go +++ b/receiver/otelarrowreceiver/internal/arrow/arrow_test.go @@ -43,7 +43,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow/mock" @@ -51,8 +51,8 @@ import ( var noopTelemetry = componenttest.NewNopTelemetrySettings() -func defaultBQ() admission.Queue { - return admission.NewBoundedQueue(noopTelemetry, 100000, 10) +func defaultBQ() admission2.Queue { + return admission2.NewBoundedQueue(noopTelemetry, 100000, 10) } type compareJSONTraces struct{ ptrace.Traces } @@ -359,7 +359,7 @@ func (ctc *commonTestCase) newOOMConsumer() arrowRecord.ConsumerAPI { return mock } -func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq admission.Queue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { +func (ctc *commonTestCase) start(newConsumer func() arrowRecord.ConsumerAPI, bq admission2.Queue, opts ...func(*configgrpc.ServerConfig, *auth.Server)) { var authServer auth.Server var gsettings configgrpc.ServerConfig for _, gf := range opts { @@ -449,7 +449,7 @@ func TestBoundedQueueLimits(t *testing.T) { batch, err := ctc.testProducer.BatchArrowRecordsFromTraces(td) require.NoError(t, err) - var bq admission.Queue + var bq admission2.Queue // Note that this test exercises the case where there is or is not an // error unrelated to pending data, thus we pass 0 in both cases as // the WaitingLimitMiB below. @@ -459,10 +459,10 @@ func TestBoundedQueueLimits(t *testing.T) { // internal/otelarrow/test/e2e_test.go. if tt.expectErr { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(0) - bq = admission.NewBoundedQueue(noopTelemetry, uint64(sizer.TracesSize(td)-100), 0) + bq = admission2.NewBoundedQueue(noopTelemetry, uint64(sizer.TracesSize(td)-100), 0) } else { ctc.stream.EXPECT().Send(statusOKFor(batch.BatchId)).Times(1).Return(nil) - bq = admission.NewBoundedQueue(noopTelemetry, uint64(tt.admitLimit), 0) + bq = admission2.NewBoundedQueue(noopTelemetry, uint64(tt.admitLimit), 0) } ctc.start(ctc.newRealConsumer, bq) diff --git a/receiver/otelarrowreceiver/internal/logs/otlp.go b/receiver/otelarrowreceiver/internal/logs/otlp.go index 6ca418019fae..6a4c76dbaeed 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" ) const dataFormatProtobuf = "protobuf" @@ -22,13 +22,13 @@ type Receiver struct { plogotlp.UnimplementedGRPCServer nextConsumer consumer.Logs obsrecv *receiverhelper.ObsReport - boundedQueue admission.Queue + boundedQueue admission2.Queue sizer *plog.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Logs, obsrecv *receiverhelper.ObsReport, bq admission2.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, diff --git a/receiver/otelarrowreceiver/internal/logs/otlp_test.go b/receiver/otelarrowreceiver/internal/logs/otlp_test.go index a0bbb056f24c..d270ff138bfb 100644 --- a/receiver/otelarrowreceiver/internal/logs/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/logs/otlp_test.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" ) @@ -206,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, lc consumer.Logs) (net.Addr, *tracet ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(telset, maxBytes, 0) + bq := admission2.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), lc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp.go b/receiver/otelarrowreceiver/internal/metrics/otlp.go index 7b11134c2031..51ffdb377fea 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" ) const dataFormatProtobuf = "protobuf" @@ -22,13 +22,13 @@ type Receiver struct { pmetricotlp.UnimplementedGRPCServer nextConsumer consumer.Metrics obsrecv *receiverhelper.ObsReport - boundedQueue admission.Queue + boundedQueue admission2.Queue sizer *pmetric.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Metrics, obsrecv *receiverhelper.ObsReport, bq admission2.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, diff --git a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go index 77a690963bac..a78378f54bd3 100644 --- a/receiver/otelarrowreceiver/internal/metrics/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/metrics/otlp_test.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" ) @@ -206,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, mc consumer.Metrics) (net.Addr, *tra ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(telset, maxBytes, 0) + bq := admission2.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), mc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/internal/trace/otlp.go b/receiver/otelarrowreceiver/internal/trace/otlp.go index af066b2682e8..d4d94b404622 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" ) const dataFormatProtobuf = "protobuf" @@ -22,13 +22,13 @@ type Receiver struct { ptraceotlp.UnimplementedGRPCServer nextConsumer consumer.Traces obsrecv *receiverhelper.ObsReport - boundedQueue admission.Queue + boundedQueue admission2.Queue sizer *ptrace.ProtoMarshaler logger *zap.Logger } // New creates a new Receiver reference. -func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq admission.Queue) *Receiver { +func New(logger *zap.Logger, nextConsumer consumer.Traces, obsrecv *receiverhelper.ObsReport, bq admission2.Queue) *Receiver { return &Receiver{ nextConsumer: nextConsumer, obsrecv: obsrecv, diff --git a/receiver/otelarrowreceiver/internal/trace/otlp_test.go b/receiver/otelarrowreceiver/internal/trace/otlp_test.go index ce769f5451c4..251e194dde3c 100644 --- a/receiver/otelarrowreceiver/internal/trace/otlp_test.go +++ b/receiver/otelarrowreceiver/internal/trace/otlp_test.go @@ -29,7 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/testdata" ) @@ -206,7 +206,7 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc consumer.Traces) (net.Addr, *trac ReceiverCreateSettings: set, }) require.NoError(t, err) - bq := admission.NewBoundedQueue(telset, maxBytes, 0) + bq := admission2.NewBoundedQueue(telset, maxBytes, 0) r := New(zap.NewNop(), tc, obsrecv, bq) // Now run it as a gRPC server srv := grpc.NewServer() diff --git a/receiver/otelarrowreceiver/otelarrow.go b/receiver/otelarrowreceiver/otelarrow.go index 5af11b232ca9..f6574ed80062 100644 --- a/receiver/otelarrowreceiver/otelarrow.go +++ b/receiver/otelarrowreceiver/otelarrow.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver/internal/arrow" @@ -45,7 +45,7 @@ type otelArrowReceiver struct { obsrepGRPC *receiverhelper.ObsReport netReporter *netstats.NetworkReporter - boundedQueue admission.Queue + boundedQueue admission2.Queue settings receiver.Settings } @@ -66,11 +66,11 @@ func newOTelArrowReceiver(cfg *Config, set receiver.Settings) (*otelArrowReceive if err != nil { return nil, err } - var bq admission.Queue + var bq admission2.Queue if cfg.Admission.RequestLimitMiB == 0 { - bq = admission.NewUnboundedQueue() + bq = admission2.NewUnboundedQueue() } else { - bq = admission.NewBoundedQueue(set.TelemetrySettings, cfg.Admission.RequestLimitMiB<<20, cfg.Admission.WaitingLimitMiB<<20) + bq = admission2.NewBoundedQueue(set.TelemetrySettings, cfg.Admission.RequestLimitMiB<<20, cfg.Admission.WaitingLimitMiB<<20) } r := &otelArrowReceiver{ cfg: cfg, From 55e398dd0987d7e911a91063abd8d724dbe1f19e Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Tue, 12 Nov 2024 10:11:49 -0800 Subject: [PATCH 7/8] tidy --- internal/otelarrow/go.mod | 6 +----- internal/otelarrow/go.sum | 9 --------- receiver/otelarrowreceiver/go.mod | 4 ---- receiver/otelarrowreceiver/go.sum | 9 --------- 4 files changed, 1 insertion(+), 27 deletions(-) diff --git a/internal/otelarrow/go.mod b/internal/otelarrow/go.mod index 69880bc08759..e8c332872282 100644 --- a/internal/otelarrow/go.mod +++ b/internal/otelarrow/go.mod @@ -3,13 +3,11 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelar go 1.22.0 require ( - github.com/google/uuid v1.6.0 github.com/klauspost/compress v1.17.11 github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter v0.113.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver v0.113.0 github.com/open-telemetry/otel-arrow v0.29.0 github.com/stretchr/testify v1.9.0 - github.com/wk8/go-ordered-map/v2 v2.1.8 go.opentelemetry.io/collector/component v0.113.0 go.opentelemetry.io/collector/config/configgrpc v0.113.0 go.opentelemetry.io/collector/config/configtelemetry v0.113.0 @@ -35,9 +33,7 @@ require ( github.com/apache/arrow/go/v16 v16.1.0 // indirect github.com/apache/arrow/go/v17 v17.0.0 // indirect github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/brianvoe/gofakeit/v6 v6.17.0 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect @@ -50,12 +46,12 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/flatbuffers v24.3.25+incompatible // indirect + github.com/google/uuid v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect - github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/internal/otelarrow/go.sum b/internal/otelarrow/go.sum index 56e9a3589f8a..910d291f80b7 100644 --- a/internal/otelarrow/go.sum +++ b/internal/otelarrow/go.sum @@ -9,12 +9,8 @@ github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -51,7 +47,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -73,8 +68,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -100,8 +93,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= -github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index d561b7196b81..f64929e94b97 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -41,8 +41,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/apache/arrow/go/v17 v17.0.0 // indirect github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect - github.com/bahlo/generic-list-go v0.2.0 // indirect - github.com/buger/jsonparser v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect @@ -61,7 +59,6 @@ require ( github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect - github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect @@ -69,7 +66,6 @@ require ( github.com/mostynb/go-grpc-compression v1.2.3 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/collector/config/configcompression v1.19.0 // indirect diff --git a/receiver/otelarrowreceiver/go.sum b/receiver/otelarrowreceiver/go.sum index 9b78e9053a5f..8f96a877b47a 100644 --- a/receiver/otelarrowreceiver/go.sum +++ b/receiver/otelarrowreceiver/go.sum @@ -7,12 +7,8 @@ github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN github.com/apache/arrow/go/v17 v17.0.0/go.mod h1:jR7QHkODl15PfYyjM2nU+yTLScZ/qfj7OSUZmJ8putc= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc h1:Keo7wQ7UODUaHcEi7ltENhbAK2VgZjfat6mLy03tQzo= github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc/go.mod h1:k08r+Yj1PRAmuayFiRK6MYuR5Ve4IuZtTfxErMIh0+c= -github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= -github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/brianvoe/gofakeit/v6 v6.17.0 h1:obbQTJeHfktJtiZzq0Q1bEpsNUs+yHrYlPVWt7BtmJ4= github.com/brianvoe/gofakeit/v6 v6.17.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8= -github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs= -github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -47,7 +43,6 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -69,8 +64,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -96,8 +89,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= -github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= From c6634bb681cc49c8d9f2cf780783fa36875ce55b Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 13 Nov 2024 09:04:10 -0800 Subject: [PATCH 8/8] 32MiB default --- receiver/otelarrowreceiver/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/otelarrowreceiver/factory.go b/receiver/otelarrowreceiver/factory.go index 99e483ce6888..c440a8de925b 100644 --- a/receiver/otelarrowreceiver/factory.go +++ b/receiver/otelarrowreceiver/factory.go @@ -21,7 +21,7 @@ const ( defaultMemoryLimitMiB = 128 defaultRequestLimitMiB = 128 - defaultWaitingLimitMiB = 1000 + defaultWaitingLimitMiB = 32 ) // NewFactory creates a new OTel-Arrow receiver factory.