Skip to content

Commit

Permalink
Refactor NatsMessage to use ByteBuffer internally. (#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshilliard authored Oct 22, 2020
1 parent b77bd65 commit f154624
Showing 1 changed file with 125 additions and 85 deletions.
210 changes: 125 additions & 85 deletions src/main/java/io/nats/client/impl/NatsMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import io.nats.client.Connection;
import io.nats.client.Message;
Expand All @@ -26,103 +26,74 @@ class NatsMessage implements Message {
private String sid;
private String subject;
private String replyTo;
private byte[] data;
private byte[] protocolBytes;
private ByteBuffer data;
private ByteBuffer protocolBytes;
private NatsSubscription subscription;
private long sizeInBytes;

NatsMessage next; // for linked list

static final byte[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'};

static int copy(byte[] dest, int pos, String toCopy) {
for (int i=0, max=toCopy.length(); i<max ;i++) {
dest[pos] = (byte) toCopy.charAt(i);
pos++;
}
private Integer protocolLength = null;

return pos;
}

private static String PUB_SPACE = NatsConnection.OP_PUB + " ";
private static String SPACE = " ";
NatsMessage next; // for linked list

// Create a message to publish
NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
NatsMessage(String subject, String replyTo, ByteBuffer data, boolean utf8mode) {
this.subject = subject;
this.replyTo = replyTo;
this.data = data;

if (utf8mode) {
int subjectSize = subject.length() * 2;
int replySize = (replyTo != null) ? replyTo.length() * 2 : 0;
StringBuilder protocolStringBuilder = new StringBuilder(4 + subjectSize + 1 + replySize + 1);
protocolStringBuilder.append(PUB_SPACE);
protocolStringBuilder.append(subject);
protocolStringBuilder.append(SPACE);

if (replyTo != null) {
protocolStringBuilder.append(replyTo);
protocolStringBuilder.append(SPACE);
}

protocolStringBuilder.append(String.valueOf(data.length));
Charset charset;

this.protocolBytes = protocolStringBuilder.toString().getBytes(StandardCharsets.UTF_8);
} else {
// Convert the length to bytes
byte[] lengthBytes = new byte[12];
int idx = lengthBytes.length;
int size = (data != null) ? data.length : 0;

if (size > 0) {
for (int i = size; i > 0; i /= 10) {
idx--;
lengthBytes[idx] = digits[i % 10];
}
// Calculate the length in bytes
int size = (data != null) ? data.limit() : 0;
int len = fastIntLength(size) + 4;
if (replyTo != null) {
if (utf8mode) {
len += fastUtf8Length(replyTo) + 1;
} else {
idx--;
lengthBytes[idx] = digits[0];
}

// Build the array
int len = 4 + subject.length() + 1 + (lengthBytes.length - idx);

if (replyTo != null) {
len += replyTo.length() + 1;
}
}
if (utf8mode) {
len += fastUtf8Length(subject) + 1;
charset = StandardCharsets.UTF_8;
} else {
len += subject.length() + 1;
charset = StandardCharsets.US_ASCII;
}
this.protocolBytes = ByteBuffer.allocate(len);
protocolBytes.put((byte)'P').put((byte)'U').put((byte)'B').put((byte)' ');
protocolBytes.put(subject.getBytes(charset));
protocolBytes.put((byte)' ');

this.protocolBytes = new byte[len];

// Copy everything
int pos = 0;
protocolBytes[0] = 'P';
protocolBytes[1] = 'U';
protocolBytes[2] = 'B';
protocolBytes[3] = ' ';
pos = 4;
pos = copy(protocolBytes, pos, subject);
protocolBytes[pos] = ' ';
pos++;

if (replyTo != null) {
pos = copy(protocolBytes, pos, replyTo);
protocolBytes[pos] = ' ';
pos++;
}
if (replyTo != null) {
protocolBytes.put(replyTo.getBytes(charset));
protocolBytes.put((byte)' ');
}

System.arraycopy(lengthBytes, idx, protocolBytes, pos, lengthBytes.length - idx);
if (size > 0) {
int base = protocolBytes.limit();
for (int i = size; i > 0; i /= 10) {
base--;
protocolBytes.put(base, (byte)(i % 10 + (byte)'0'));
}
} else {
protocolBytes.put((byte)'0');
}
protocolBytes.clear();
}

this.sizeInBytes = this.protocolBytes.length + data.length + 4;// for 2x \r\n
NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
this(subject, replyTo, ByteBuffer.wrap(data), utf8mode);
}

// Create a protocol only message to publish
NatsMessage(CharBuffer protocol) {
ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(protocol);
this.protocolBytes = Arrays.copyOfRange(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());
Arrays.fill(byteBuffer.array(), (byte) 0); // clear sensitive data
this.sizeInBytes = this.protocolBytes.length + 2;// for \r\n
if (protocol.remaining() == 0) {
this.protocolBytes = ByteBuffer.allocate(0);
} else {
protocol.mark();
this.protocolBytes = ByteBuffer.allocate(fastUtf8Length(protocol));
protocol.reset();
StandardCharsets.UTF_8.newEncoder().encode(protocol, this.protocolBytes, true);
protocolBytes.clear();
}
}

// Create an incoming message for a subscriber
Expand All @@ -133,24 +104,94 @@ static int copy(byte[] dest, int pos, String toCopy) {
if (replyTo != null) {
this.replyTo = replyTo;
}
this.sizeInBytes = protocolLength + 2;
this.protocolLength = protocolLength;
this.data = null; // will set data and size after we read it
}

private static int fastUtf8Length(CharSequence cs) {
int count = 0;
for (int i = 0, len = cs.length(); i < len; i++) {
char ch = cs.charAt(i);
if (ch <= 0x7F) {
count++;
} else if (ch <= 0x7FF) {
count += 2;
} else if (Character.isHighSurrogate(ch)) {
count += 4;
++i;
} else {
count += 3;
}
}
return count;
}

private static int fastIntLength(int number) {
if (number < 100000) {
if (number < 100) {
if (number < 10) {
return 1;
} else {
return 2;
}
} else {
if (number < 1000) {
return 3;
} else {
if (number < 10000) {
return 4;
} else {
return 5;
}
}
}
} else {
if (number < 10000000) {
if (number < 1000000) {
return 6;
} else {
return 7;
}
} else {
if (number < 100000000) {
return 8;
} else {
if (number < 1000000000) {
return 9;
} else {
return 10;
}
}
}
}
}

boolean isProtocol() {
return this.subject == null;
}

// Will be null on an incoming message
byte[] getProtocolBytes() {
return this.protocolBytes;
return this.protocolBytes.array();
}

int getControlLineLength() {
return (this.protocolBytes != null) ? this.protocolBytes.length + 2 : -1;
return (this.protocolBytes != null) ? this.protocolBytes.limit() + 2 : -1;
}

long getSizeInBytes() {
long sizeInBytes = 0;
if (this.protocolBytes != null) {
sizeInBytes += this.protocolBytes.limit();
}
if (this.protocolLength != null){
sizeInBytes += this.protocolLength;
}
if (data != null) {
sizeInBytes += data.limit() + 4;// for 2x \r\n
} else {
sizeInBytes += 2;
}
return sizeInBytes;
}

Expand All @@ -160,8 +201,7 @@ public String getSID() {

// Only for incoming messages, with no protocol bytes
void setData(byte[] data) {
this.data = data;
this.sizeInBytes += data.length + 2;// for \r\n, we already set the length for the protocol bytes in the constructor
this.data = ByteBuffer.wrap(data);
}

void setSubscription(NatsSubscription sub) {
Expand Down Expand Up @@ -189,7 +229,7 @@ public String getReplyTo() {
}

public byte[] getData() {
return this.data;
return this.data.array();
}

public Subscription getSubscription() {
Expand Down

0 comments on commit f154624

Please sign in to comment.