Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(internal/otelarrow) Add new LIFO boundedqueue #36140

Merged
merged 10 commits into from
Nov 12, 2024
27 changes: 27 additions & 0 deletions .chloggen/otelarrow-lifo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: otelarrowreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a new LIFO-based bounded queue.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
32 changes: 32 additions & 0 deletions internal/otelarrow/admission2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Admission Package

## Overview

The admission package provides a BoundedQueue object. This object
implements a semaphore for limiting the number of bytes admitted into
a collector pipeline. Additionally, the BoundedQueue limits the
number of bytes allowed to block on a call to `Acquire(pending int64)`.

There are two error conditions generated within this code:

- `rejecting request, too much pending data`: When the limit on waiting bytes its reached, this will be returned to limit the total amount waiting.
- `rejecting request, request is too large`: When an individual request exceeds the configured limit, this will be returned without acquiring or waiting.

The BoundedQueue implements LIFO semantics. See this
[article](https://medium.com/swlh/fifo-considered-harmful-793b76f98374)
explaining why it is preferred to FIFO semantics.

## Usage

Create a new BoundedQueue by calling `bq := admission.NewBoundedQueue(maxLimitBytes, maxLimitWaiting)`

Within the component call `bq.Acquire(ctx, requestSize)` which will:

1. succeed immediately if there is enough available memory,
2. fail immediately if there are too many waiters, or
3. block until context cancelation or enough bytes becomes available.

When the resources have been acquired successfully, a closure is
returned that, when called, will release the semaphore. When the
semaphore is released, pending waiters that can be satisfied will
acquire the resource and become unblocked.
177 changes: 177 additions & 0 deletions internal/otelarrow/admission2/boundedqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package admission2 // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/admission2"

import (
"container/list"
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
grpccodes "google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var ErrTooMuchWaiting = status.Error(grpccodes.ResourceExhausted, "rejecting request, too much pending data")
var ErrRequestTooLarge = status.Errorf(grpccodes.InvalidArgument, "rejecting request, request is too large")

// BoundedQueue is a LIFO-oriented admission-controlled Queue.
type BoundedQueue struct {
maxLimitAdmit uint64
maxLimitWait uint64
tracer trace.Tracer

// lock protects currentAdmitted, currentWaiting, and waiters

lock sync.Mutex
currentAdmitted uint64
currentWaiting uint64
waiters *list.List // of *waiter
}

var _ Queue = &BoundedQueue{}

// waiter is an item in the BoundedQueue waiters list.
type waiter struct {
notify N
pending uint64
}

// NewBoundedQueue returns a LIFO-oriented Queue implementation which
// admits `maxLimitAdmit` bytes concurrently and allows up to
// `maxLimitWait` bytes to wait for admission.
func NewBoundedQueue(ts component.TelemetrySettings, maxLimitAdmit, maxLimitWait uint64) Queue {
return &BoundedQueue{
maxLimitAdmit: maxLimitAdmit,
maxLimitWait: maxLimitWait,
waiters: list.New(),
tracer: ts.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow"),
}
}

// acquireOrGetWaiter returns with three distinct conditions depending
// on whether it was accepted, rejected, or asked to wait.
//
// - element=nil, error=nil: the fast success path
// - element=nil, error=non-nil: the fast failure path
// - element=non-nil, error=non-nil: the slow success path
func (bq *BoundedQueue) acquireOrGetWaiter(pending uint64) (*list.Element, error) {
if pending > bq.maxLimitAdmit {
// when the request will never succeed because it is
// individually over the total limit, fail fast.
return nil, ErrRequestTooLarge
}
jmacd marked this conversation as resolved.
Show resolved Hide resolved

bq.lock.Lock()
defer bq.lock.Unlock()

if bq.currentAdmitted+pending <= bq.maxLimitAdmit {
// the fast success path.
bq.currentAdmitted += pending
return nil, nil
}

// since we were unable to admit, check if we can wait.
if bq.currentWaiting+pending > bq.maxLimitWait {
return nil, ErrTooMuchWaiting
}
jmacd marked this conversation as resolved.
Show resolved Hide resolved

// otherwise we need to wait
return bq.addWaiterLocked(pending), nil
}

// Acquire implements Queue.
func (bq *BoundedQueue) Acquire(ctx context.Context, pending uint64) (ReleaseFunc, error) {
element, err := bq.acquireOrGetWaiter(pending)
parentSpan := trace.SpanFromContext(ctx)
pendingAttr := trace.WithAttributes(attribute.Int64("pending", int64(pending)))

if err != nil {
parentSpan.AddEvent("admission rejected (fast path)", pendingAttr)
return noopRelease, err
} else if element == nil {
parentSpan.AddEvent("admission accepted (fast path)", pendingAttr)
return bq.releaseFunc(pending), nil
}

parentSpan.AddEvent("enter admission queue")

ctx, span := bq.tracer.Start(ctx, "admission_blocked", pendingAttr)
defer span.End()

waiter := element.Value.(*waiter)

select {
case <-waiter.notify.Chan():
parentSpan.AddEvent("admission accepted (slow path)", pendingAttr)
return bq.releaseFunc(pending), nil

case <-ctx.Done():
bq.lock.Lock()
defer bq.lock.Unlock()

if waiter.notify.HasBeenNotified() {
// We were also admitted, which can happen
// concurrently with cancellation. Make sure
// to release since no one else will do it.
bq.releaseLocked(pending)
} else {
// Remove ourselves from the list of waiters
// so that we can't be admitted in the future.
bq.removeWaiterLocked(pending, element)
bq.admitWaitersLocked()
}

parentSpan.AddEvent("admission rejected (canceled)", pendingAttr)
return noopRelease, status.Error(grpccodes.Canceled, context.Cause(ctx).Error())
jmacd marked this conversation as resolved.
Show resolved Hide resolved
}
}

func (bq *BoundedQueue) admitWaitersLocked() {
for bq.waiters.Len() != 0 {
// Ensure there is enough room to admit the next waiter.
element := bq.waiters.Back()
waiter := element.Value.(*waiter)
if bq.currentAdmitted+waiter.pending > bq.maxLimitAdmit {
// Returning means continuing to wait for the
// most recent arrival to get service by another release.
jmacd marked this conversation as resolved.
Show resolved Hide resolved
return
}

// Release the next waiter and tell it that it has been admitted.
bq.removeWaiterLocked(waiter.pending, element)
bq.currentAdmitted += waiter.pending

waiter.notify.Notify()
}
}

func (bq *BoundedQueue) addWaiterLocked(pending uint64) *list.Element {
bq.currentWaiting += pending
return bq.waiters.PushBack(&waiter{
jmacd marked this conversation as resolved.
Show resolved Hide resolved
pending: pending,
notify: newNotification(),
})
}

func (bq *BoundedQueue) removeWaiterLocked(pending uint64, element *list.Element) {
bq.currentWaiting -= pending
bq.waiters.Remove(element)
}

func (bq *BoundedQueue) releaseLocked(pending uint64) {
bq.currentAdmitted -= pending
bq.admitWaitersLocked()
}

func (bq *BoundedQueue) releaseFunc(pending uint64) ReleaseFunc {
return func() {
bq.lock.Lock()
defer bq.lock.Unlock()

bq.releaseLocked(pending)
}
}
Loading