diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java index 42c5889e3d3..3ce7d14fc9d 100644 --- a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorImpl.java @@ -1191,88 +1191,6 @@ public RowSetShiftDataGenerator(final RowSetShiftData shifted) throws IOExceptio } } - public static class DrainableByteArrayInputStream extends DefensiveDrainable { - - private byte[] buf; - private final int offset; - private final int length; - - public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) { - this.buf = Objects.requireNonNull(buf); - this.offset = offset; - this.length = length; - } - - @Override - public int available() { - if (buf == null) { - return 0; - } - return length; - } - - @Override - public int drainTo(final OutputStream outputStream) throws IOException { - if (buf != null) { - try { - outputStream.write(buf, offset, length); - } finally { - buf = null; - } - return length; - } - return 0; - } - } - - public static class ConsecutiveDrainableStreams extends DefensiveDrainable { - final DefensiveDrainable[] streams; - - public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) { - this.streams = streams; - } - - @Override - public int drainTo(final OutputStream outputStream) throws IOException { - int total = 0; - for (final DefensiveDrainable stream : streams) { - final int expected = total + stream.available(); - total += ((Drainable) stream).drainTo(outputStream); - if (expected != total) { - throw new IllegalStateException("drained message drained wrong number of bytes"); - } - if (total < 0) { - throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE"); - } - } - return total; - } - - @Override - public int available() throws SizeException, IOException { - int total = 0; - for (final DefensiveDrainable stream : streams) { - total += stream.available(); - if (total < 0) { - throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); - } - } - return total; - } - - @Override - public void close() throws IOException { - for (final DefensiveDrainable stream : streams) { - try { - stream.close(); - } catch (final IOException e) { - throw new UncheckedDeephavenException("unexpected IOException", e); - } - } - super.close(); - } - } - private static final class EmptyRowSetGenerator extends RowSetGenerator { public static final EmptyRowSetGenerator INSTANCE; static { diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java new file mode 100644 index 00000000000..c507e14e0b4 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/ConsecutiveDrainableStreams.java @@ -0,0 +1,61 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.UncheckedDeephavenException; +import io.deephaven.extensions.barrage.util.DefensiveDrainable; +import io.deephaven.util.datastructures.SizeException; +import io.grpc.Drainable; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.io.OutputStream; + +public class ConsecutiveDrainableStreams extends DefensiveDrainable { + final DefensiveDrainable[] streams; + + public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) { + this.streams = streams; + } + + @Override + public int drainTo(final OutputStream outputStream) throws IOException { + int total = 0; + for (final DefensiveDrainable stream : streams) { + final int expected = total + stream.available(); + total += ((Drainable) stream).drainTo(outputStream); + if (expected != total) { + throw new IllegalStateException("drained message drained wrong number of bytes"); + } + if (total < 0) { + throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE"); + } + } + return total; + } + + @Override + public int available() throws SizeException, IOException { + int total = 0; + for (final DefensiveDrainable stream : streams) { + total += stream.available(); + if (total < 0) { + throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total); + } + } + return total; + } + + @Override + public void close() throws IOException { + for (final DefensiveDrainable stream : streams) { + try { + stream.close(); + } catch (final IOException e) { + throw new UncheckedDeephavenException("unexpected IOException", e); + } + } + super.close(); + } +} diff --git a/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java new file mode 100644 index 00000000000..f2b14a7dc44 --- /dev/null +++ b/extensions/barrage/src/main/java/io/deephaven/extensions/barrage/DrainableByteArrayInputStream.java @@ -0,0 +1,44 @@ +// +// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.barrage; + +import io.deephaven.extensions.barrage.util.DefensiveDrainable; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Objects; + +public class DrainableByteArrayInputStream extends DefensiveDrainable { + + private byte[] buf; + private final int offset; + private final int length; + + public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) { + this.buf = Objects.requireNonNull(buf); + this.offset = offset; + this.length = length; + } + + @Override + public int available() { + if (buf == null) { + return 0; + } + return length; + } + + @Override + public int drainTo(final OutputStream outputStream) throws IOException { + if (buf != null) { + try { + outputStream.write(buf, offset, length); + } finally { + buf = null; + } + return length; + } + return 0; + } +} diff --git a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java index e6c59d7efb8..73be2b851af 100644 --- a/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java +++ b/extensions/barrage/src/test/java/io/deephaven/extensions/barrage/BarrageStreamGeneratorTest.java @@ -14,8 +14,8 @@ public class BarrageStreamGeneratorTest { @Test public void testDrainableStreamIsEmptied() throws IOException { final int length = 512; - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream inputStream = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final DrainableByteArrayInputStream inputStream = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); int bytesRead = inputStream.drainTo(new NullOutputStream()); @@ -26,12 +26,11 @@ public void testDrainableStreamIsEmptied() throws IOException { @Test public void testConsecutiveDrainableStreamIsEmptied() throws IOException { final int length = 512; - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in1 = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); - final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in2 = - new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); - final BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams inputStream = - new BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams(in1, in2); + final DrainableByteArrayInputStream in1 = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final DrainableByteArrayInputStream in2 = + new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length); + final ConsecutiveDrainableStreams inputStream = new ConsecutiveDrainableStreams(in1, in2); int bytesRead = inputStream.drainTo(new NullOutputStream());