Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTel-Arrow receiver admission limits: redesign #36033

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-admission.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Admission control improvements (LIFO); admission.waiter_limit is deprecated, replaced with admission.waiting_limit_mib.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
32 changes: 22 additions & 10 deletions internal/otelarrow/admission/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,31 @@

## Overview

The admission package provides a BoundedQueue object which is a semaphore implementation that limits the number of bytes admitted into a collector pipeline. Additionally the BoundedQueue limits the number of waiters that can block on a call to `bq.Acquire(sz int64)`.
The admission package provides a BoundedQueue object. This object
implements a semaphore for limiting the number of bytes admitted into
a collector pipeline. Additionally, the BoundedQueue limits the
number of bytes allowed to block on a call to `Acquire(pending int64)`.

This package is an experiment to improve the behavior of Collector pipelines having their `exporterhelper` configured to apply backpressure. This package is meant to be used in receivers, via an interceptor or custom logic. Therefore, the BoundedQueue helps limit memory within the entire collector pipeline by limiting two dimensions that cause memory issues:
1. bytes: large requests that enter the collector pipeline can require large allocations even if downstream components will eventually limit or ratelimit the request.
2. waiters: limiting on bytes alone is not enough because requests that enter the pipeline and block on `bq.Acquire()` can still consume memory within the receiver. If there are enough waiters this can be a significant contribution to memory usage.
There are two error conditions generated within this code:

- `rejecting request, too much pending data`: When the limit on waiting bytes its reached, this will be returned to limit the total amount waiting.
- `rejecting request, request is too large`: When an individual request exceeds the configured limit, this will be returned without acquiring or waiting.

The BoundedQueue implements LIFO semantics. See this
[article](https://medium.com/swlh/fifo-considered-harmful-793b76f98374)
explaining why it is preferred to FIFO semantics.

## Usage

Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiters)`
Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiting)`

Within the component call `bq.Acquire(ctx, requestSize)` which will:

Within the component call `bq.Acquire(ctx, requestSize)` which will either
1. succeed immediately if there is enough available memory
2. fail immediately if there are too many waiters
3. block until context cancelation or enough bytes becomes available
1. succeed immediately if there is enough available memory,
2. fail immediately if there are too many waiters, or
3. block until context cancelation or enough bytes becomes available.

Once a request has finished processing and is sent downstream call `bq.Release(requestSize)` to allow waiters to be admitted for processing. Release should only fail if releasing more bytes than previously acquired.
When the resources have been acquired successfully, a closure is
returned that, when called, will release the semaphore. When the
semaphore is released, pending waiters that can be satisfied will
acquire the resource and become unblocked.
215 changes: 116 additions & 99 deletions internal/otelarrow/admission/boundedqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,157 +4,174 @@
package admission // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission"

import (
"container/list"
"context"
"fmt"
"sync"

"github.com/google/uuid"
orderedmap "github.com/wk8/go-ordered-map/v2"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var ErrTooManyWaiters = fmt.Errorf("rejecting request, too many waiters")
var ErrTooMuchWaiting = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data")
var ErrRequestTooLarge = status.Errorf(grpccodes.InvalidArgument, "rejecting request, request is too large")

// BoundedQueue is a LIFO-oriented admission-controlled Queue.
type BoundedQueue struct {
maxLimitBytes int64
maxLimitWaiters int64
currentBytes int64
currentWaiters int64
maxLimitAdmit uint64
maxLimitWait uint64
tracer trace.Tracer

// lock protects currentAdmitted, currentWaiting, and waiters

lock sync.Mutex
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
tracer trace.Tracer
currentAdmitted uint64
currentWaiting uint64
waiters *list.List // of *waiter
}

var _ Queue = &BoundedQueue{}

// waiter is an item in the BoundedQueue waiters list.
type waiter struct {
readyCh chan struct{}
pendingBytes int64
ID uuid.UUID
notify N
pending uint64
}

func NewBoundedQueue(tp trace.TracerProvider, maxLimitBytes, maxLimitWaiters int64) *BoundedQueue {
// NewBoundedQueue returns a LIFO-oriented Queue implementation which
// admits `maxLimitAdmit` bytes concurrently and allows up to
// `maxLimitWait` bytes to wait for admission.
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue {
return &BoundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
waiters: orderedmap.New[uuid.UUID, waiter](),
tracer: tp.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"),
maxLimitAdmit: maxLimitAdmit,
maxLimitWait: maxLimitWait,
waiters: list.New(),
tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"),
}
}

func (bq *BoundedQueue) admit(pendingBytes int64) (bool, error) {
// acquireOrGetWaiter returns with three distinct conditions depending
// on whether it was accepted, rejected, or asked to wait.
//
// - element=nil, error=nil: the fast success path
// - element=nil, error=non-nil: the fast failure path
// - element=non-nil, error=non-nil: the slow success path
func (bq *BoundedQueue) acquireOrGetWaiter(pending uint64) (*list.Element, error) {
bq.lock.Lock()
defer bq.lock.Unlock()

if pendingBytes > bq.maxLimitBytes { // will never succeed
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
if pending > bq.maxLimitAdmit {
// when the request will never succeed because it is
// individually over the total limit, fail fast.
return nil, ErrRequestTooLarge
}

if bq.currentBytes+pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
bq.currentBytes += pendingBytes
return true, nil
if bq.currentAdmitted+pending <= bq.maxLimitAdmit {
// the fast success path.
bq.currentAdmitted += pending
return nil, nil
}

// since we were unable to admit, check if we can wait.
if bq.currentWaiters+1 > bq.maxLimitWaiters { // too many waiters
return false, ErrTooManyWaiters
if bq.currentWaiting+pending > bq.maxLimitWait {
return nil, ErrTooMuchWaiting
}

// if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex.
bq.currentWaiters++
return false, nil
// otherwise we need to wait
return bq.addWaiterLocked(pending), nil
}

func (bq *BoundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
success, err := bq.admit(pendingBytes)
if err != nil || success {
return err
// Acquire implements Queue.
func (bq *BoundedQueue) Acquire(ctx context.Context, pending uint64) (ReleaseFunc, error) {
element, err := bq.acquireOrGetWaiter(pending)
parentSpan := trace.SpanFromContext(ctx)
pendingAttr := trace.WithAttributes(attribute.Int64("pending", int64(pending)))

if err != nil {
parentSpan.AddEvent("admission rejected (fast path)", pendingAttr)
return noopRelease, err
} else if element == nil {
parentSpan.AddEvent("admission accepted (fast path)", pendingAttr)
return bq.releaseFunc(pending), nil
}

// otherwise we need to wait for bytes to be released
curWaiter := waiter{
pendingBytes: pendingBytes,
readyCh: make(chan struct{}),
}
parentSpan.AddEvent("enter admission queue")

bq.lock.Lock()

// generate unique key
for {
id := uuid.New()
_, keyExists := bq.waiters.Get(id)
if keyExists {
continue
}
bq.waiters.Set(id, curWaiter)
curWaiter.ID = id
break
}

bq.lock.Unlock()
ctx, span := bq.tracer.Start(ctx, "admission_blocked",
trace.WithAttributes(attribute.Int64("pending", pendingBytes)))
ctx, span := bq.tracer.Start(ctx, "admission_blocked", pendingAttr)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the use of list.List avoids the need to generate a new key. This simplification is possible because we always remove in LIFO order and don't require random access.

defer span.End()

waiter := element.Value.(*waiter)

select {
case <-curWaiter.readyCh:
return nil
case <-waiter.notify.Chan():
parentSpan.AddEvent("admission accepted (slow path)", pendingAttr)
return bq.releaseFunc(pending), nil

Comment on lines +108 to +111
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review notes: This is the same logic as before, but we return a release function to avoid fallible interfaces.

case <-ctx.Done():
// canceled before acquired so remove waiter.
bq.lock.Lock()
defer bq.lock.Unlock()
err = fmt.Errorf("context canceled: %w ", ctx.Err())
span.SetStatus(codes.Error, "context canceled")

_, found := bq.waiters.Delete(curWaiter.ID)
if !found {
return err
if waiter.notify.HasBeenNotified() {
// We were also admitted, which can happen
// concurrently with cancellation. Make sure
// to release since no one else will do it.
bq.releaseLocked(pending)
Comment on lines +117 to +120
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review notes: this is a new code path, fixes the race condition.

} else {
// Remove ourselves from the list of waiters
// so that we can't be admitted in the future.
bq.removeWaiterLocked(pending, element)
bq.admitWaitersLocked()
}

bq.currentWaiters--
return err
parentSpan.AddEvent("admission rejected (canceled)", pendingAttr)
return noopRelease, status.Error(grpccodes.Canceled, context.Cause(ctx).Error())
}
}

func (bq *BoundedQueue) Release(pendingBytes int64) error {
bq.lock.Lock()
defer bq.lock.Unlock()
func (bq *BoundedQueue) admitWaitersLocked() {
for bq.waiters.Len() != 0 {
// Ensure there is enough room to admit the next waiter.
element := bq.waiters.Back()
waiter := element.Value.(*waiter)
if bq.currentAdmitted+waiter.pending > bq.maxLimitAdmit {
// Returning means continuing to wait for the
// most recent arrival to get service by another release.
return
}

bq.currentBytes -= pendingBytes
// Release the next waiter and tell it that it has been admitted.
bq.removeWaiterLocked(waiter.pending, element)
bq.currentAdmitted += waiter.pending

if bq.currentBytes < 0 {
return fmt.Errorf("released more bytes than acquired")
waiter.notify.Notify()
}
}

for {
if bq.waiters.Len() == 0 {
return nil
}
next := bq.waiters.Oldest()
nextWaiter := next.Value
nextKey := next.Key
if bq.currentBytes+nextWaiter.pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += nextWaiter.pendingBytes
bq.currentWaiters--
close(nextWaiter.readyCh)
_, found := bq.waiters.Delete(nextKey)
if !found {
return fmt.Errorf("deleting waiter that doesn't exist")
}
continue
}
break
}
func (bq *BoundedQueue) addWaiterLocked(pending uint64) *list.Element {
bq.currentWaiting += pending
return bq.waiters.PushBack(&waiter{
pending: pending,
notify: newNotification(),
})
}

return nil
func (bq *BoundedQueue) removeWaiterLocked(pending uint64, element *list.Element) {
bq.currentWaiting -= pending
bq.waiters.Remove(element)
}

func (bq *BoundedQueue) TryAcquire(pendingBytes int64) bool {
bq.lock.Lock()
defer bq.lock.Unlock()
if bq.currentBytes+pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += pendingBytes
return true
func (bq *BoundedQueue) releaseLocked(pending uint64) {
bq.currentAdmitted -= pending
bq.admitWaitersLocked()
}

func (bq *BoundedQueue) releaseFunc(pending uint64) ReleaseFunc {
return func() {
bq.lock.Lock()
defer bq.lock.Unlock()

bq.releaseLocked(pending)
}
return false
}
Loading
Loading