> {
* @throws RuntimeException Other exceptions
*/
default void writeBytes(final byte[] bytes, final String key, int timeoutInMillis) throws Exception {
- throw new RuntimeException("Not supported");
+ throw new UnsupportedOperationException("This buffer type does not support bytes.");
}
/**
@@ -92,6 +92,17 @@ default Duration getDrainTimeout() {
return Duration.ZERO;
}
+ /**
+ * Indicates if writes to this buffer are also in some way written
+ * onto the JVM heap. If writes do go on heap, this should false
+ * which is the default.
+ *
+ * @return True if this buffer does not write to the JVM heap.
+ */
+ default boolean isWrittenOffHeapOnly() {
+ return false;
+ }
+
/**
* shuts down the buffer
*/
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/DelegatingBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/DelegatingBuffer.java
new file mode 100644
index 0000000000..84f57883ea
--- /dev/null
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/DelegatingBuffer.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package org.opensearch.dataprepper.model.buffer;
+
+import org.opensearch.dataprepper.model.CheckpointState;
+import org.opensearch.dataprepper.model.record.Record;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An implementation of {@link Buffer} which delegates all calls to a delgate
+ * (or inner) buffer.
+ *
+ * This class exists to help with writing decorators of the {@link Buffer} interface.
+ *
+ * @param The type of data in the buffer
+ *
+ * @since 2.6
+ */
+public abstract class DelegatingBuffer> implements Buffer {
+ private final Buffer delegateBuffer;
+
+ /**
+ * Constructor for subclasses to use.
+ *
+ * @param delegateBuffer The delegate (or inner) buffer.
+ *
+ * @since 2.6
+ */
+ protected DelegatingBuffer(final Buffer delegateBuffer) {
+ this.delegateBuffer = Objects.requireNonNull(delegateBuffer);
+ }
+
+ @Override
+ public void write(final T record, final int timeoutInMillis) throws TimeoutException {
+ delegateBuffer.write(record, timeoutInMillis);
+ }
+
+ @Override
+ public void writeAll(final Collection records, final int timeoutInMillis) throws Exception {
+ delegateBuffer.writeAll(records, timeoutInMillis);
+ }
+
+ @Override
+ public void writeBytes(final byte[] bytes, final String key, final int timeoutInMillis) throws Exception {
+ delegateBuffer.writeBytes(bytes, key, timeoutInMillis);
+ }
+
+ @Override
+ public Map.Entry, CheckpointState> read(final int timeoutInMillis) {
+ return delegateBuffer.read(timeoutInMillis);
+ }
+
+ @Override
+ public void checkpoint(final CheckpointState checkpointState) {
+ delegateBuffer.checkpoint(checkpointState);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegateBuffer.isEmpty();
+ }
+
+ @Override
+ public boolean isByteBuffer() {
+ return delegateBuffer.isByteBuffer();
+ }
+
+ @Override
+ public Duration getDrainTimeout() {
+ return delegateBuffer.getDrainTimeout();
+ }
+
+ @Override
+ public boolean isWrittenOffHeapOnly() {
+ return delegateBuffer.isWrittenOffHeapOnly();
+ }
+
+ @Override
+ public void shutdown() {
+ delegateBuffer.shutdown();
+ }
+}
diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java
index 6924cb35bc..a8ea4a3ee1 100644
--- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java
+++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/configuration/PluginSetting.java
@@ -161,6 +161,7 @@ public List getTypedList(final String attribute, final Class type) {
* Returns the value of the specified {@literal List