diff --git a/common/src/main/java/com/zegelin/prometheus/exposition/FormattedByteChannel.java b/common/src/main/java/com/zegelin/prometheus/exposition/FormattedByteChannel.java index 3363025..4278ba1 100644 --- a/common/src/main/java/com/zegelin/prometheus/exposition/FormattedByteChannel.java +++ b/common/src/main/java/com/zegelin/prometheus/exposition/FormattedByteChannel.java @@ -1,32 +1,50 @@ package com.zegelin.prometheus.exposition; +import com.google.common.annotations.VisibleForTesting; + import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; public class FormattedByteChannel implements ReadableByteChannel { - public static final int MIN_CHUNK_SIZE = 1024 * 1024; - public static final int MAX_CHUNK_SIZE = MIN_CHUNK_SIZE * 5; + public static final int DEFAULT_CHUNK_THRESHOLD = 1024 * 1024; + public static final int MAX_CHUNK_SIZE = DEFAULT_CHUNK_THRESHOLD * 5; private final FormattedExposition formattedExposition; + private final int chunkThreshold; + + public FormattedByteChannel(final FormattedExposition formattedExposition) { + this(formattedExposition, DEFAULT_CHUNK_THRESHOLD); + } - public FormattedByteChannel(FormattedExposition formattedExposition) { + @VisibleForTesting + FormattedByteChannel(final FormattedExposition formattedExposition, final int chunkThreshold) { this.formattedExposition = formattedExposition; + this.chunkThreshold = chunkThreshold; } @Override - public int read(ByteBuffer dst) { + public int read(final ByteBuffer dst) { if (!isOpen()) { return -1; } - NioExpositionSink sink = new NioExpositionSink(dst); - while (sink.getIngestedByteCount() < MIN_CHUNK_SIZE && isOpen()) { + // Forcing the calling ChunkedNioStream to flush the buffer + if (hasBufferReachedChunkThreshold(dst)) { + return -1; + } + + final NioExpositionSink sink = new NioExpositionSink(dst); + while (!hasBufferReachedChunkThreshold(dst) && isOpen()) { formattedExposition.nextSlice(sink); } return sink.getIngestedByteCount(); } + private boolean hasBufferReachedChunkThreshold(final ByteBuffer dst) { + return dst.position() >= chunkThreshold; + } + @Override public boolean isOpen() { return !formattedExposition.isEndOfInput(); diff --git a/common/src/test/java/com/zegelin/prometheus/exposition/TestFormattedByteChannel.java b/common/src/test/java/com/zegelin/prometheus/exposition/TestFormattedByteChannel.java index adebcd4..bfed417 100644 --- a/common/src/test/java/com/zegelin/prometheus/exposition/TestFormattedByteChannel.java +++ b/common/src/test/java/com/zegelin/prometheus/exposition/TestFormattedByteChannel.java @@ -1,5 +1,9 @@ package com.zegelin.prometheus.exposition; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedNioStream; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; @@ -8,13 +12,15 @@ import java.nio.ByteBuffer; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; public class TestFormattedByteChannel { @Mock - private FormattedExposition formattedExposition; + private ChannelHandlerContext ctx; + + private ChunkedNioStream chunkedNioStream; + + private TenSliceExposition formattedExposition; private ByteBuffer buffer; private FormattedByteChannel channel; @@ -22,13 +28,18 @@ public class TestFormattedByteChannel { @BeforeMethod public void before() { MockitoAnnotations.initMocks(this); + buffer = ByteBuffer.allocate(128); - channel = new FormattedByteChannel(formattedExposition); + formattedExposition = new TenSliceExposition(); + channel = new FormattedByteChannel(formattedExposition, 64); + + when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT); + chunkedNioStream = new ChunkedNioStream(channel, 128); } @Test public void testClosed() { - when(formattedExposition.isEndOfInput()).thenReturn(true); + formattedExposition.setSlices(0); assertThat(channel.read(buffer)).isEqualTo(-1); assertThat(channel.isOpen()).isEqualTo(false); @@ -36,21 +47,67 @@ public void testClosed() { @Test public void testOpen() { - when(formattedExposition.isEndOfInput()).thenReturn(false); + formattedExposition.setSlices(1); assertThat(channel.isOpen()).isEqualTo(true); } @Test - public void testOneChunk() { - when(formattedExposition.isEndOfInput()).thenReturn(false).thenReturn(false).thenReturn(true); - doAnswer(invocation -> { - NioExpositionSink sink = invocation.getArgument(0); + public void testOneSlice() throws Exception { + formattedExposition.setSlices(1); + ByteBuf byteBuf; + + byteBuf = chunkedNioStream.readChunk(ctx); + assertThat(byteBuf.readableBytes()).isEqualTo(10); + assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true); + } + + @Test + public void testTwoSlices() throws Exception { + formattedExposition.setSlices(2); + ByteBuf byteBuf; + + byteBuf = chunkedNioStream.readChunk(ctx); + assertThat(byteBuf.readableBytes()).isEqualTo(20); + assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true); + } + + @Test + public void testTwoChunks() throws Exception { + formattedExposition.setSlices(10); + ByteBuf byteBuf; + + byteBuf = chunkedNioStream.readChunk(ctx); + assertThat(byteBuf.readableBytes()).isEqualTo(70); + assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(false); + + byteBuf = chunkedNioStream.readChunk(ctx); + assertThat(byteBuf.readableBytes()).isEqualTo(30); + assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true); + } + + // A dummy Exposition implementation that will generate a specific number of slices of size 10. + private static class TenSliceExposition implements FormattedExposition { + private int slices = 0; + private int currentSlice = 0; + + private void setSlices(final int chunks) { + this.slices = chunks; + } + + @Override + public void nextSlice(final ExpositionSink sink) { + if (isEndOfInput()) { + return; + } + + currentSlice++; sink.writeAscii("abcdefghij"); - return null; - }).when(formattedExposition).nextSlice(any(NioExpositionSink.class)); + } - assertThat(channel.read(buffer)).isEqualTo(10); - assertThat(channel.isOpen()).isEqualTo(false); + @Override + public boolean isEndOfInput() { + return currentSlice >= slices; + } } }