Skip to content

Commit

Permalink
Move general util classes to their own types
Browse files Browse the repository at this point in the history
  • Loading branch information
niloc132 committed Jun 1, 2024
1 parent 41b0407 commit 390d96c
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1191,88 +1191,6 @@ public RowSetShiftDataGenerator(final RowSetShiftData shifted) throws IOExceptio
}
}

public static class DrainableByteArrayInputStream extends DefensiveDrainable {

private byte[] buf;
private final int offset;
private final int length;

public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) {
this.buf = Objects.requireNonNull(buf);
this.offset = offset;
this.length = length;
}

@Override
public int available() {
if (buf == null) {
return 0;
}
return length;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
if (buf != null) {
try {
outputStream.write(buf, offset, length);
} finally {
buf = null;
}
return length;
}
return 0;
}
}

public static class ConsecutiveDrainableStreams extends DefensiveDrainable {
final DefensiveDrainable[] streams;

public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) {
this.streams = streams;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
final int expected = total + stream.available();
total += ((Drainable) stream).drainTo(outputStream);
if (expected != total) {
throw new IllegalStateException("drained message drained wrong number of bytes");
}
if (total < 0) {
throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE");
}
}
return total;
}

@Override
public int available() throws SizeException, IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
total += stream.available();
if (total < 0) {
throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total);
}
}
return total;
}

@Override
public void close() throws IOException {
for (final DefensiveDrainable stream : streams) {
try {
stream.close();
} catch (final IOException e) {
throw new UncheckedDeephavenException("unexpected IOException", e);
}
}
super.close();
}
}

private static final class EmptyRowSetGenerator extends RowSetGenerator {
public static final EmptyRowSetGenerator INSTANCE;
static {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.UncheckedDeephavenException;
import io.deephaven.extensions.barrage.util.DefensiveDrainable;
import io.deephaven.util.datastructures.SizeException;
import io.grpc.Drainable;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.io.OutputStream;

public class ConsecutiveDrainableStreams extends DefensiveDrainable {
final DefensiveDrainable[] streams;

public ConsecutiveDrainableStreams(final @NotNull DefensiveDrainable... streams) {
this.streams = streams;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
final int expected = total + stream.available();
total += ((Drainable) stream).drainTo(outputStream);
if (expected != total) {
throw new IllegalStateException("drained message drained wrong number of bytes");
}
if (total < 0) {
throw new IllegalStateException("drained message is too large; exceeds Integer.MAX_VALUE");
}
}
return total;
}

@Override
public int available() throws SizeException, IOException {
int total = 0;
for (final DefensiveDrainable stream : streams) {
total += stream.available();
if (total < 0) {
throw new SizeException("drained message is too large; exceeds Integer.MAX_VALUE", total);
}
}
return total;
}

@Override
public void close() throws IOException {
for (final DefensiveDrainable stream : streams) {
try {
stream.close();
} catch (final IOException e) {
throw new UncheckedDeephavenException("unexpected IOException", e);
}
}
super.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.extensions.barrage.util.DefensiveDrainable;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;

public class DrainableByteArrayInputStream extends DefensiveDrainable {

private byte[] buf;
private final int offset;
private final int length;

public DrainableByteArrayInputStream(final byte[] buf, final int offset, final int length) {
this.buf = Objects.requireNonNull(buf);
this.offset = offset;
this.length = length;
}

@Override
public int available() {
if (buf == null) {
return 0;
}
return length;
}

@Override
public int drainTo(final OutputStream outputStream) throws IOException {
if (buf != null) {
try {
outputStream.write(buf, offset, length);
} finally {
buf = null;
}
return length;
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ public class BarrageStreamGeneratorTest {
@Test
public void testDrainableStreamIsEmptied() throws IOException {
final int length = 512;
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream inputStream =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final DrainableByteArrayInputStream inputStream =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);

int bytesRead = inputStream.drainTo(new NullOutputStream());

Expand All @@ -26,12 +26,11 @@ public void testDrainableStreamIsEmptied() throws IOException {
@Test
public void testConsecutiveDrainableStreamIsEmptied() throws IOException {
final int length = 512;
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in1 =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final BarrageStreamGeneratorImpl.DrainableByteArrayInputStream in2 =
new BarrageStreamGeneratorImpl.DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams inputStream =
new BarrageStreamGeneratorImpl.ConsecutiveDrainableStreams(in1, in2);
final DrainableByteArrayInputStream in1 =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final DrainableByteArrayInputStream in2 =
new DrainableByteArrayInputStream(new byte[length * 2], length / 2, length);
final ConsecutiveDrainableStreams inputStream = new ConsecutiveDrainableStreams(in1, in2);

int bytesRead = inputStream.drainTo(new NullOutputStream());

Expand Down

0 comments on commit 390d96c

Please sign in to comment.