Skip to content

Commit

Permalink
Force flush of nio buffer when threshold is reached
Browse files Browse the repository at this point in the history
Update unit tests to verify the FormattedByteChannel with the
Netty ChunkedNioStream.

Closes instaclustr#83
  • Loading branch information
eperott committed Mar 31, 2020
1 parent 641b3f4 commit f4243eb
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,32 +1,50 @@
package com.zegelin.prometheus.exposition;

import com.google.common.annotations.VisibleForTesting;

import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

public class FormattedByteChannel implements ReadableByteChannel {
public static final int MIN_CHUNK_SIZE = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = MIN_CHUNK_SIZE * 5;
public static final int DEFAULT_CHUNK_THRESHOLD = 1024 * 1024;
public static final int MAX_CHUNK_SIZE = DEFAULT_CHUNK_THRESHOLD * 5;

private final FormattedExposition formattedExposition;
private final int chunkThreshold;

public FormattedByteChannel(final FormattedExposition formattedExposition) {
this(formattedExposition, DEFAULT_CHUNK_THRESHOLD);
}

public FormattedByteChannel(FormattedExposition formattedExposition) {
@VisibleForTesting
FormattedByteChannel(final FormattedExposition formattedExposition, final int chunkThreshold) {
this.formattedExposition = formattedExposition;
this.chunkThreshold = chunkThreshold;
}

@Override
public int read(ByteBuffer dst) {
public int read(final ByteBuffer dst) {
if (!isOpen()) {
return -1;
}

NioExpositionSink sink = new NioExpositionSink(dst);
while (sink.getIngestedByteCount() < MIN_CHUNK_SIZE && isOpen()) {
// Forcing the calling ChunkedNioStream to flush the buffer
if (hasBufferReachedChunkThreshold(dst)) {
return -1;
}

final NioExpositionSink sink = new NioExpositionSink(dst);
while (!hasBufferReachedChunkThreshold(dst) && isOpen()) {
formattedExposition.nextSlice(sink);
}

return sink.getIngestedByteCount();
}

private boolean hasBufferReachedChunkThreshold(final ByteBuffer dst) {
return dst.position() >= chunkThreshold;
}

@Override
public boolean isOpen() {
return !formattedExposition.isEndOfInput();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.zegelin.prometheus.exposition;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedNioStream;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.BeforeMethod;
Expand All @@ -8,49 +12,102 @@
import java.nio.ByteBuffer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;

public class TestFormattedByteChannel {
@Mock
private FormattedExposition formattedExposition;
private ChannelHandlerContext ctx;

private ChunkedNioStream chunkedNioStream;

private TenSliceExposition formattedExposition;

private ByteBuffer buffer;
private FormattedByteChannel channel;

@BeforeMethod
public void before() {
MockitoAnnotations.initMocks(this);

buffer = ByteBuffer.allocate(128);
channel = new FormattedByteChannel(formattedExposition);
formattedExposition = new TenSliceExposition();
channel = new FormattedByteChannel(formattedExposition, 64);

when(ctx.alloc()).thenReturn(UnpooledByteBufAllocator.DEFAULT);
chunkedNioStream = new ChunkedNioStream(channel, 128);
}

@Test
public void testClosed() {
when(formattedExposition.isEndOfInput()).thenReturn(true);
formattedExposition.setSlices(0);

assertThat(channel.read(buffer)).isEqualTo(-1);
assertThat(channel.isOpen()).isEqualTo(false);
}

@Test
public void testOpen() {
when(formattedExposition.isEndOfInput()).thenReturn(false);
formattedExposition.setSlices(1);

assertThat(channel.isOpen()).isEqualTo(true);
}

@Test
public void testOneChunk() {
when(formattedExposition.isEndOfInput()).thenReturn(false).thenReturn(false).thenReturn(true);
doAnswer(invocation -> {
NioExpositionSink sink = invocation.getArgument(0);
public void testOneSlice() throws Exception {
formattedExposition.setSlices(1);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(10);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoSlices() throws Exception {
formattedExposition.setSlices(2);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(20);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

@Test
public void testTwoChunks() throws Exception {
formattedExposition.setSlices(10);
ByteBuf byteBuf;

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(70);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(false);

byteBuf = chunkedNioStream.readChunk(ctx);
assertThat(byteBuf.readableBytes()).isEqualTo(30);
assertThat(chunkedNioStream.isEndOfInput()).isEqualTo(true);
}

// A dummy Exposition implementation that will generate a specific number of slices of size 10.
private static class TenSliceExposition implements FormattedExposition {
private int slices = 0;
private int currentSlice = 0;

private void setSlices(final int chunks) {
this.slices = chunks;
}

@Override
public void nextSlice(final ExpositionSink<?> sink) {
if (isEndOfInput()) {
return;
}

currentSlice++;
sink.writeAscii("abcdefghij");
return null;
}).when(formattedExposition).nextSlice(any(NioExpositionSink.class));
}

assertThat(channel.read(buffer)).isEqualTo(10);
assertThat(channel.isOpen()).isEqualTo(false);
@Override
public boolean isEndOfInput() {
return currentSlice >= slices;
}
}
}

0 comments on commit f4243eb

Please sign in to comment.