Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize usage of the Netty's and MsgPack's buffers #438

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.9.0</version>
<version>0.9.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/tarantool/driver/api/TarantoolClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ public class TarantoolClientConfig {
private static final int DEFAULT_CONNECTIONS = 1;
private static final int DEFAULT_CURSOR_BATCH_SIZE = 100;
private static final int DEFAULT_EVENT_LOOP_THREADS_NUMBER = 0;
private static final int DEFAULT_WRITE_BATCH_SIZE = 128;

private TarantoolCredentials credentials;
private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
private int readTimeout = DEFAULT_READ_TIMEOUT;
private int requestTimeout = DEFAULT_REQUEST_TIMEOUT;
private int connections = DEFAULT_CONNECTIONS;
private int eventLoopThreadsNumber = DEFAULT_EVENT_LOOP_THREADS_NUMBER;
private int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
private MessagePackMapper messagePackMapper =
DefaultMessagePackMapperFactory.getInstance().defaultComplexTypesMapper();
private ConnectionSelectionStrategyFactory connectionSelectionStrategyFactory =
Expand Down Expand Up @@ -63,6 +65,7 @@ public TarantoolClientConfig(TarantoolClientConfig config) {
this.isSecure.set(config.isSecure.get());
this.sslContext = config.getSslContext();
this.eventLoopThreadsNumber = config.getEventLoopThreadsNumber();
this.writeBatchSize = config.getWriteBatchSize();
}

/**
Expand Down Expand Up @@ -268,6 +271,24 @@ public void setEventLoopThreadsNumber(int eventLoopThreadsNumber) {
this.eventLoopThreadsNumber = eventLoopThreadsNumber;
}

/**
* Get maximum number of requests to be sent in one batch to the server.
*
* @return a positive integer value
*/
public int getWriteBatchSize() {
return writeBatchSize;
}

/**
* Set maximum number of requests to be sent in one batch to the server.
*
* @param writeBatchSize maximum number of requests in batch
*/
public void setWriteBatchSize(int writeBatchSize) {
this.writeBatchSize = writeBatchSize;
}

/**
* A builder for {@link TarantoolClientConfig}
*/
Expand Down Expand Up @@ -419,6 +440,20 @@ public Builder withEventLoopThreadsNumber(int eventLoopThreadsNumber) {
return this;
}

/**
* Specify request batch size. Increase this number if this client instance does much
* more writes than reads, but the bigger is this number, the more memory will be used by the
* client and the bigger will be the amount of outbound bytes sent at once. Default is 128
*
* @param writeBatchSize maximum number of requests in batch
* @return builder
*/
public Builder withWriteBatchSize(int writeBatchSize) {
Assert.state(writeBatchSize > 0, "WriteBatchSize should be equal or greater than 0");
config.setWriteBatchSize(writeBatchSize);
return this;
}

/**
* Build a {@link TarantoolClientConfig} instance
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,66 +1,58 @@
package io.tarantool.driver.codecs;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.tarantool.driver.protocol.TarantoolResponse;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessageUnpacker;
import org.msgpack.core.buffer.ByteBufferInput;

import java.util.List;
import java.nio.ByteBuffer;

/**
* Converts Tarantool server responses from MessagePack frames to Java objects
*
* @author Alexey Kuzin
*/
public class MessagePackFrameDecoder extends ReplayingDecoder<MessagePackFrameDecoder.DecoderState> {
public class MessagePackFrameDecoder extends ByteToMessageDecoder {

private static final int MINIMAL_HEADER_SIZE = 5; // MP_UINT32
private static final int MINIMAL_BODY_SIZE = 1 * 1024 * 1024; // 1 MB
private int size;

public MessagePackFrameDecoder() {
super(DecoderState.LENGTH);
}
private final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(MINIMAL_HEADER_SIZE);
private final ByteBufferInput lenBufferInput = new ByteBufferInput(lenBuffer);
private final MessageUnpacker lenUnpacker = new MessagePack.UnpackerConfig().newUnpacker(lenBufferInput);
private final ByteBuffer bodyBuffer = ByteBuffer.allocateDirect(MINIMAL_BODY_SIZE);
akudiyar marked this conversation as resolved.
Show resolved Hide resolved
private final ByteBufferInput bodyBufferInput = new ByteBufferInput(bodyBuffer);
private final MessageUnpacker bodyUnpacker = new MessagePack.UnpackerConfig().newUnpacker(bodyBufferInput);
ArtDu marked this conversation as resolved.
Show resolved Hide resolved

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
throws Exception {
if (byteBuf.readableBytes() < MINIMAL_HEADER_SIZE) {
return;
}

switch (state()) {
case LENGTH:
ByteBuf lenBuf = byteBuf.readBytes(MINIMAL_HEADER_SIZE);
try (ByteBufInputStream in = new ByteBufInputStream(lenBuf)) {
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(in);
size = unpacker.unpackInt();
unpacker.close();
checkpoint(DecoderState.BODY);
}
lenBuf.release();
case BODY:
if (size > 0) {
if (byteBuf.readableBytes() < size) {
return;
}
ByteBuf bodyBuf = byteBuf.readBytes(size);
try (ByteBufInputStream in = new ByteBufInputStream(bodyBuf)) {
MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(in);
list.add(TarantoolResponse.fromMessagePack(unpacker));
unpacker.close();
size = 0;
}
bodyBuf.release();
}
checkpoint(DecoderState.LENGTH);
break;
default:
throw new Error("Shouldn't reach here.");
byteBuf.markReaderIndex();
lenBuffer.clear();
lenBufferInput.reset(lenBuffer);
lenUnpacker.reset(lenBufferInput);
byteBuf.readBytes(lenBuffer);
size = lenUnpacker.unpackInt();

if (byteBuf.readableBytes() < size) {
byteBuf.resetReaderIndex();
return;
}
}

protected enum DecoderState {
LENGTH,
BODY
bodyBuffer.clear();
bodyBuffer.limit(size);
bodyBufferInput.reset(bodyBuffer);
bodyUnpacker.reset(bodyBufferInput);
byteBuf.readBytes(bodyBuffer);
list.add(TarantoolResponse.fromMessagePack(bodyUnpacker));
size = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
import io.netty.handler.codec.MessageToByteEncoder;
import io.tarantool.driver.mappers.MessagePackObjectMapper;
import io.tarantool.driver.protocol.TarantoolRequest;
import org.msgpack.core.MessageBufferPacker;

import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.buffer.ArrayBufferOutput;
import org.msgpack.core.buffer.MessageBuffer;

/**
* Converts Tarantool requests from Java objects to MessagePack frames
Expand All @@ -15,27 +18,36 @@
*/
public class MessagePackFrameEncoder extends MessageToByteEncoder<TarantoolRequest> {

private static final int MINIMAL_HEADER_SIZE = 5; // MP_UINT32
private static final int MINIMAL_HEADER_SIZE = 8; // MP_UINT32
private static final int MINIMAL_BODY_SIZE = 1 * 1024 * 1024; // 1 MB
private final MessagePackObjectMapper mapper;
private final ArrayBufferOutput lenBufferOutput = new ArrayBufferOutput(MINIMAL_HEADER_SIZE);
private final MessagePacker lenPacker = new MessagePack.PackerConfig().newPacker(lenBufferOutput);
private final ArrayBufferOutput bodyBufferOutput = new ArrayBufferOutput(MINIMAL_BODY_SIZE);
private final MessagePacker bodyPacker = new MessagePack.PackerConfig().newPacker(bodyBufferOutput);

public MessagePackFrameEncoder(MessagePackObjectMapper mapper) {
super();
this.mapper = mapper;
}

@Override
protected void encode(
ChannelHandlerContext ctx, TarantoolRequest tarantoolRequest,
ByteBuf byteBuf) throws Exception {
MessageBufferPacker packer = MessagePack.newDefaultBufferPacker();
tarantoolRequest.toMessagePack(packer, mapper);
long outputSize = packer.getTotalWrittenBytes();
byteBuf.capacity((int) (outputSize + MINIMAL_HEADER_SIZE));
byte[] output = packer.toByteArray();
packer.clear();
packer.packLong(outputSize);
byteBuf.writeBytes(packer.toByteArray());
packer.close();
byteBuf.writeBytes(output);
protected void encode(ChannelHandlerContext ctx, TarantoolRequest tarantoolRequest, ByteBuf byteBuf)
throws Exception {
bodyBufferOutput.clear();
bodyPacker.clear();
tarantoolRequest.toMessagePack(bodyPacker, mapper);
bodyPacker.flush();
MessageBuffer bodyBuffer = bodyBufferOutput.toMessageBuffer();
lenBufferOutput.clear();
lenPacker.clear();
lenPacker.packLong(bodyBuffer.size());
lenPacker.flush();
MessageBuffer lenBuffer = lenBufferOutput.toMessageBuffer();
byteBuf.capacity(bodyBuffer.size() + lenBuffer.size());
byteBuf.writeBytes(
lenBufferOutput.toMessageBuffer().sliceAsByteBuffer(0, lenBuffer.size()));
byteBuf.writeBytes(
bodyBufferOutput.toMessageBuffer().sliceAsByteBuffer(0, bodyBuffer.size()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslContext;
import io.tarantool.driver.TarantoolVersionHolder;
import io.tarantool.driver.api.TarantoolClientConfig;
Expand Down Expand Up @@ -56,7 +57,9 @@ protected void initChannel(SocketChannel socketChannel) {
}

// greeting and authentication (will be removed after successful authentication)
pipeline.addLast("TarantoolAuthenticationHandler",
pipeline
.addLast("FlushConsolidationHandler", new FlushConsolidationHandler(config.getWriteBatchSize(), true))
.addLast("TarantoolAuthenticationHandler",
new TarantoolAuthenticationHandler<>(
connectionFuture,
versionHolder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,21 +249,22 @@ private CompletableFuture<List<TarantoolConnection>> establishConnectionsToEndpo
List<CompletableFuture<TarantoolConnection>> connections = connectionFactory
.multiConnection(serverAddress.getSocketAddress(), connectionCount, connectionListeners).stream()
.peek(cf -> cf.thenApply(conn -> {
if (conn.isConnected()) {
if (conn != null && conn.isConnected()) {
logger.info("Connected to Tarantool server at {}", conn.getRemoteAddress());

conn.addConnectionFailureListener((c, ex) -> {
// Connection lost, signal the next thread coming
// for connection to start the init sequence
connectionMode.set(ConnectionMode.PARTIAL);
try {
c.close();
} catch (Exception e) {
logger.info("Failed to close the connection: {}", e.getMessage());
}
});
conn.addConnectionCloseListener(
c -> logger.info("Disconnected from {}", c.getRemoteAddress()));
}
conn.addConnectionFailureListener((c, ex) -> {
// Connection lost, signal the next thread coming
// for connection to start the init sequence
connectionMode.set(ConnectionMode.PARTIAL);
try {
c.close();
} catch (Exception e) {
logger.info("Failed to close the connection: {}", e.getMessage());
}
});
conn.addConnectionCloseListener(
c -> logger.info("Disconnected from {}", c.getRemoteAddress()));
return conn;
})
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public CompletableFuture<TarantoolConnection> singleConnection(

return result.handle((connection, ex) -> {
if (ex != null) {
logger.warn("Connection failed: {}", ex.getMessage());
logger.error(String.format("Failed to connect to the Tarantool server at %s", serverAddress), ex);
future.channel().close();
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public TarantoolRequestHandler(RequestFutureManager futureManager) {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
TarantoolRequest request = (TarantoolRequest) msg;
ctx.write(request).addListener((ChannelFutureListener) channelFuture -> {
ctx.writeAndFlush(request, promise).addListener((ChannelFutureListener) channelFuture -> {
if (!channelFuture.isSuccess()) {
TarantoolRequestMetadata requestMeta = futureManager.getRequest(request.getHeader().getSync());
// The request metadata may has been deleted already after timeout
Expand Down
Loading