From 7c3e96e0de23bd59171a8006fcb8ca53034a2030 Mon Sep 17 00:00:00 2001 From: James Hilliard Date: Fri, 16 Oct 2020 07:07:13 -0600 Subject: [PATCH] Refactor NatsMessage to use ByteBuffer internally. --- .../java/io/nats/client/impl/NatsMessage.java | 197 ++++++++++-------- 1 file changed, 112 insertions(+), 85 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsMessage.java b/src/main/java/io/nats/client/impl/NatsMessage.java index 66546e6c2..f8b1e0e29 100644 --- a/src/main/java/io/nats/client/impl/NatsMessage.java +++ b/src/main/java/io/nats/client/impl/NatsMessage.java @@ -15,8 +15,8 @@ import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import io.nats.client.Connection; import io.nats.client.Message; @@ -26,103 +26,71 @@ class NatsMessage implements Message { private String sid; private String subject; private String replyTo; - private byte[] data; - private byte[] protocolBytes; + private ByteBuffer data; + private ByteBuffer protocolBytes; private NatsSubscription subscription; - private long sizeInBytes; - - NatsMessage next; // for linked list - - static final byte[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}; - - static int copy(byte[] dest, int pos, String toCopy) { - for (int i=0, max=toCopy.length(); i 0) { - for (int i = size; i > 0; i /= 10) { - idx--; - lengthBytes[idx] = digits[i % 10]; - } + // Calculate the length in bytes + int size = (data != null) ? data.limit() : 0; + int len = 4; + len += fastIntLength(size); + if (replyTo != null) { + if (utf8mode) { + len += fastUtf8Length(replyTo) + 1; } else { - idx--; - lengthBytes[idx] = digits[0]; - } - - // Build the array - int len = 4 + subject.length() + 1 + (lengthBytes.length - idx); - - if (replyTo != null) { len += replyTo.length() + 1; } + } + if (utf8mode) { + len += fastUtf8Length(subject) + 1; + charset = StandardCharsets.UTF_8; + } else { + len += subject.length() + 1; + charset = StandardCharsets.US_ASCII; + } + this.protocolBytes = ByteBuffer.allocate(len); + protocolBytes.put((byte)'P').put((byte)'U').put((byte)'B').put((byte)' '); + protocolBytes.put(subject.getBytes(charset)); + protocolBytes.put((byte)' '); - this.protocolBytes = new byte[len]; - - // Copy everything - int pos = 0; - protocolBytes[0] = 'P'; - protocolBytes[1] = 'U'; - protocolBytes[2] = 'B'; - protocolBytes[3] = ' '; - pos = 4; - pos = copy(protocolBytes, pos, subject); - protocolBytes[pos] = ' '; - pos++; - - if (replyTo != null) { - pos = copy(protocolBytes, pos, replyTo); - protocolBytes[pos] = ' '; - pos++; - } + if (replyTo != null) { + protocolBytes.put(replyTo.getBytes(charset)); + protocolBytes.put((byte)' '); + } - System.arraycopy(lengthBytes, idx, protocolBytes, pos, lengthBytes.length - idx); + if (size > 0) { + int base = protocolBytes.limit(); + for (int i = size; i > 0; i /= 10) { + base--; + protocolBytes.put(base, (byte)(i % 10 + 0x30)); + } + } else { + protocolBytes.put((byte)0x30); } + protocolBytes.clear(); + } - this.sizeInBytes = this.protocolBytes.length + data.length + 4;// for 2x \r\n + NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) { + this(subject, replyTo, ByteBuffer.wrap(data), utf8mode); } // Create a protocol only message to publish NatsMessage(CharBuffer protocol) { - ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(protocol); - this.protocolBytes = Arrays.copyOfRange(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit()); - Arrays.fill(byteBuffer.array(), (byte) 0); // clear sensitive data - this.sizeInBytes = this.protocolBytes.length + 2;// for \r\n + protocol.mark(); + this.protocolBytes = ByteBuffer.allocate(fastUtf8Length(protocol)); + protocol.reset(); + StandardCharsets.UTF_8.newEncoder().encode(protocol, this.protocolBytes, true); + protocolBytes.clear(); } // Create an incoming message for a subscriber @@ -133,24 +101,84 @@ static int copy(byte[] dest, int pos, String toCopy) { if (replyTo != null) { this.replyTo = replyTo; } - this.sizeInBytes = protocolLength + 2; + this.protocolLength = protocolLength; this.data = null; // will set data and size after we read it } + private static int fastUtf8Length(CharSequence cs) { + return cs.length() + cs + .codePoints() + .filter(cp -> cp>0x7f) + .map(cp -> cp<=0x7ff? 1: 2) + .sum(); + } + + private static int fastIntLength(int number) { + if (number < 100000) { + if (number < 100) { + if (number < 10) { + return 1; + } else { + return 2; + } + } else { + if (number < 1000) { + return 3; + } else { + if (number < 10000) { + return 4; + } else { + return 5; + } + } + } + } else { + if (number < 10000000) { + if (number < 1000000) { + return 6; + } else { + return 7; + } + } else { + if (number < 100000000) { + return 8; + } else { + if (number < 1000000000) { + return 9; + } else { + return 10; + } + } + } + } + } + boolean isProtocol() { return this.subject == null; } // Will be null on an incoming message byte[] getProtocolBytes() { - return this.protocolBytes; + return this.protocolBytes.array(); } int getControlLineLength() { - return (this.protocolBytes != null) ? this.protocolBytes.length + 2 : -1; + return (this.protocolBytes != null) ? this.protocolBytes.limit() + 2 : -1; } long getSizeInBytes() { + long sizeInBytes = 0; + if (this.protocolBytes != null) { + sizeInBytes += this.protocolBytes.limit(); + } + if (this.protocolLength != null){ + sizeInBytes += this.protocolLength; + } + if (data != null) { + sizeInBytes += data.limit() + 4;// for 2x \r\n + } else { + sizeInBytes += 2; + } return sizeInBytes; } @@ -160,8 +188,7 @@ public String getSID() { // Only for incoming messages, with no protocol bytes void setData(byte[] data) { - this.data = data; - this.sizeInBytes += data.length + 2;// for \r\n, we already set the length for the protocol bytes in the constructor + this.data = ByteBuffer.wrap(data); } void setSubscription(NatsSubscription sub) { @@ -189,7 +216,7 @@ public String getReplyTo() { } public byte[] getData() { - return this.data; + return this.data.array(); } public Subscription getSubscription() {