Skip to content

Commit

Permalink
Port PooledObjectReference, and experimentally integrate it with S3Ch…
Browse files Browse the repository at this point in the history
…annelContext
  • Loading branch information
rcaudy committed Feb 14, 2024
1 parent 3a35a56 commit 3b176d6
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package io.deephaven.base.reference;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* {@link SimpleReference} implementation with built-in reference-counting and pooling support.
*/
public abstract class PooledObjectReference<REFERENT_TYPE> implements SimpleReference<REFERENT_TYPE>, AutoCloseable {

/**
* Field updater for {@code state}.
*/
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<PooledObjectReference> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PooledObjectReference.class, "state");

/**
* An available reference with zero outstanding permits will have {@code state == 1}.
*/
private static final int AVAILABLE_ZERO_PERMITS = 1;

/**
* A cleared reference with zero outstanding permits will have {@code state == 0}.
*/
private static final int CLEARED_ZERO_PERMITS = 0;

/**
* The bit we use to denote availability.
*/
private static final int STATE_AVAILABLE_BIT = 1;

/**
* The quantity to add to state when incrementing the number of outstanding permits.
*/
private static final int STATE_PERMIT_ACQUIRE_QUANTITY = 2;

/**
* The quantity to add to state when decrementing the number of outstanding permits.
*/
private static final int STATE_PERMIT_RELEASE_QUANTITY = -STATE_PERMIT_ACQUIRE_QUANTITY;

private static boolean stateAllowsAcquire(final int currentState) {
return currentState != CLEARED_ZERO_PERMITS;
}

private static boolean stateIsAvailable(final int currentState) {
return (currentState & STATE_AVAILABLE_BIT) != 0;
}

private static boolean stateIsCleared(final int currentState) {
return (currentState & STATE_AVAILABLE_BIT) == 0;
}

private static int calculateNewStateForClear(final int currentState) {
return currentState ^ STATE_AVAILABLE_BIT;
}

private static int calculateNewStateForAcquire(final int currentState) {
return currentState + STATE_PERMIT_ACQUIRE_QUANTITY;
}

/**
* Try to atomically update {@code state}.
*
* @param currentState The expected value
* @param newState The desired result value
* @return Whether {@code state} was successfully updated
*/
private boolean tryUpdateState(final int currentState, final int newState) {
return STATE_UPDATER.compareAndSet(this, currentState, newState);
}

/**
* Atomically decrement the number of outstanding permits and get the new value of {@code state}.
*
* @return The new value of {@code state}
*/
private int decrementOutstandingPermits() {
return STATE_UPDATER.addAndGet(this, STATE_PERMIT_RELEASE_QUANTITY);
}

/**
* The actual referent. Set to null after {@code state == CLEARED_ZERO_PERMITS} by the responsible thread, which
* returns it to the pool.
*/
private volatile REFERENT_TYPE referent;

/**
* The state of this reference. The lowest bit is used to denote whether this reference is available (1) or cleared
* (0). The higher bits represent an integer count of the number of outstanding permits.
*/
private volatile int state = AVAILABLE_ZERO_PERMITS;

/**
* Construct a new PooledObjectReference to the supplied referent.
*
* @param referent The referent of this reference
*/
protected PooledObjectReference(@NotNull final REFERENT_TYPE referent) {
this.referent = Objects.requireNonNull(referent, "referent");
}

/**
* Get the referent. It is an error to call this method if the caller does not have any outstanding permits.
*
* @return The referent if this reference has not been cleared, null otherwise (which implies an error by the
* caller)
*/
@Override
public final REFERENT_TYPE get() {
onReferentAccessed();
return referent;
}

/**
* Callback for accounting purposes, e.g. in support of a LRU policy for pooled item reclamation.
*/
protected void onReferentAccessed() {}

/**
* Acquire an active use permit.
*
* @return Whether a permit was acquired
*/
public final boolean acquire() {
int currentState;
while (stateAllowsAcquire(currentState = state)) {
final int newState = calculateNewStateForAcquire(currentState);
if (tryUpdateState(currentState, newState)) {
return true;
}
}
return false;
}

/**
* Acquire an active use permit and return the referent, if possible.
*
* @return The referent, or null if no permit could be acquired
*/
@Nullable
public final REFERENT_TYPE acquireAndGet() {
if (acquire()) {
return get();
}
return null;
}

/**
* Release an active use permit. It is a serious error to release more permits than acquired.
*/
public final void release() {
final int newState = decrementOutstandingPermits();
if (newState < 0) {
throw new IllegalStateException(this + " released more than acquired");
}
if (!stateAllowsAcquire(newState)) {
returnReferentToPool(referent);
referent = null;
}
}

/**
* Clear this reference (and return its referent to the pool) when it no longer has any outstanding permits, which
* may mean immediately if the number of outstanding permits is already zero. All invocations after the first will
* have no effect.
*/
@Override
public final void clear() {
int currentState;
while (stateIsAvailable(currentState = state)) {
final int newState = calculateNewStateForClear(currentState);
if (tryUpdateState(currentState, newState)) {
if (!stateAllowsAcquire(newState)) {
returnReferentToPool(referent);
referent = null;
}
return;
}
}
}

/**
* Return the referent to the pool.
*
* @param referent The referent to return
*/
protected abstract void returnReferentToPool(@NotNull REFERENT_TYPE referent);

/**
* Synonym for {@link #release()}, intended for use as an {@link AutoCloseable} in a try-with-resources block.
*/
@Override
public final void close() {
release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,13 @@
import org.jetbrains.annotations.NotNull;

import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* Implements a recurring reference counting pattern - a concurrent reference count that should refuse to go below zero,
* and invokes {@link #onReferenceCountAtZero()} exactly once when the count returns to zero.
*/
public abstract class ReferenceCounted implements LogOutputAppendable, Serializable {

private static final long serialVersionUID = 1L;
public abstract class ReferenceCounted implements LogOutputAppendable {

/**
* Field updater for referenceCount, so we can avoid creating an {@link java.util.concurrent.atomic.AtomicInteger}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.deephaven.extensions.s3;

import io.deephaven.base.reference.PooledObjectReference;
import io.deephaven.base.reference.SimpleReference;
import io.deephaven.util.datastructures.SegmentedSoftPool;
import io.deephaven.util.referencecounting.ReferenceCounted;
import org.jetbrains.annotations.NotNull;

import java.nio.ByteBuffer;

Expand All @@ -21,14 +25,28 @@ final class BufferPool {
ByteBuffer::clear);
}

public ByteBuffer take(final int size) {
public PooledObjectReference<ByteBuffer> take(final int size) {
if (size > bufferSize) {
throw new IllegalArgumentException("Buffer size " + size + " is larger than pool size " + bufferSize);
}
return pool.take();
return new BufferReference(pool.take());
}

public void give(ByteBuffer buffer) {
private void give(ByteBuffer buffer) {
pool.give(buffer);
}

final class BufferReference extends PooledObjectReference<ByteBuffer> {

private volatile ByteBuffer buffer;

BufferReference(@NotNull final ByteBuffer buffer) {
super(buffer);
}

@Override
protected void returnReferentToPool(@NotNull ByteBuffer referent) {
give(referent);
}
}
}
Loading

0 comments on commit 3b176d6

Please sign in to comment.