Skip to content

Commit

Permalink
Refactor NatsMessage to use ByteBuffer internally.
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshilliard committed Oct 20, 2020
1 parent b77bd65 commit 7c3e96e
Showing 1 changed file with 112 additions and 85 deletions.
197 changes: 112 additions & 85 deletions src/main/java/io/nats/client/impl/NatsMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import io.nats.client.Connection;
import io.nats.client.Message;
Expand All @@ -26,103 +26,71 @@ class NatsMessage implements Message {
private String sid;
private String subject;
private String replyTo;
private byte[] data;
private byte[] protocolBytes;
private ByteBuffer data;
private ByteBuffer protocolBytes;
private NatsSubscription subscription;
private long sizeInBytes;

NatsMessage next; // for linked list

static final byte[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'};

static int copy(byte[] dest, int pos, String toCopy) {
for (int i=0, max=toCopy.length(); i<max ;i++) {
dest[pos] = (byte) toCopy.charAt(i);
pos++;
}

return pos;
}
private Integer protocolLength = null;

private static String PUB_SPACE = NatsConnection.OP_PUB + " ";
private static String SPACE = " ";
NatsMessage next; // for linked list

// Create a message to publish
NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
NatsMessage(String subject, String replyTo, ByteBuffer data, boolean utf8mode) {
this.subject = subject;
this.replyTo = replyTo;
this.data = data;

if (utf8mode) {
int subjectSize = subject.length() * 2;
int replySize = (replyTo != null) ? replyTo.length() * 2 : 0;
StringBuilder protocolStringBuilder = new StringBuilder(4 + subjectSize + 1 + replySize + 1);
protocolStringBuilder.append(PUB_SPACE);
protocolStringBuilder.append(subject);
protocolStringBuilder.append(SPACE);

if (replyTo != null) {
protocolStringBuilder.append(replyTo);
protocolStringBuilder.append(SPACE);
}

protocolStringBuilder.append(String.valueOf(data.length));
Charset charset;

this.protocolBytes = protocolStringBuilder.toString().getBytes(StandardCharsets.UTF_8);
} else {
// Convert the length to bytes
byte[] lengthBytes = new byte[12];
int idx = lengthBytes.length;
int size = (data != null) ? data.length : 0;

if (size > 0) {
for (int i = size; i > 0; i /= 10) {
idx--;
lengthBytes[idx] = digits[i % 10];
}
// Calculate the length in bytes
int size = (data != null) ? data.limit() : 0;
int len = 4;
len += fastIntLength(size);
if (replyTo != null) {
if (utf8mode) {
len += fastUtf8Length(replyTo) + 1;
} else {
idx--;
lengthBytes[idx] = digits[0];
}

// Build the array
int len = 4 + subject.length() + 1 + (lengthBytes.length - idx);

if (replyTo != null) {
len += replyTo.length() + 1;
}
}
if (utf8mode) {
len += fastUtf8Length(subject) + 1;
charset = StandardCharsets.UTF_8;
} else {
len += subject.length() + 1;
charset = StandardCharsets.US_ASCII;
}
this.protocolBytes = ByteBuffer.allocate(len);
protocolBytes.put((byte)'P').put((byte)'U').put((byte)'B').put((byte)' ');
protocolBytes.put(subject.getBytes(charset));
protocolBytes.put((byte)' ');

this.protocolBytes = new byte[len];

// Copy everything
int pos = 0;
protocolBytes[0] = 'P';
protocolBytes[1] = 'U';
protocolBytes[2] = 'B';
protocolBytes[3] = ' ';
pos = 4;
pos = copy(protocolBytes, pos, subject);
protocolBytes[pos] = ' ';
pos++;

if (replyTo != null) {
pos = copy(protocolBytes, pos, replyTo);
protocolBytes[pos] = ' ';
pos++;
}
if (replyTo != null) {
protocolBytes.put(replyTo.getBytes(charset));
protocolBytes.put((byte)' ');
}

System.arraycopy(lengthBytes, idx, protocolBytes, pos, lengthBytes.length - idx);
if (size > 0) {
int base = protocolBytes.limit();
for (int i = size; i > 0; i /= 10) {
base--;
protocolBytes.put(base, (byte)(i % 10 + 0x30));
}
} else {
protocolBytes.put((byte)0x30);
}
protocolBytes.clear();
}

this.sizeInBytes = this.protocolBytes.length + data.length + 4;// for 2x \r\n
NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
this(subject, replyTo, ByteBuffer.wrap(data), utf8mode);
}

// Create a protocol only message to publish
NatsMessage(CharBuffer protocol) {
ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(protocol);
this.protocolBytes = Arrays.copyOfRange(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());
Arrays.fill(byteBuffer.array(), (byte) 0); // clear sensitive data
this.sizeInBytes = this.protocolBytes.length + 2;// for \r\n
protocol.mark();
this.protocolBytes = ByteBuffer.allocate(fastUtf8Length(protocol));
protocol.reset();
StandardCharsets.UTF_8.newEncoder().encode(protocol, this.protocolBytes, true);
protocolBytes.clear();
}

// Create an incoming message for a subscriber
Expand All @@ -133,24 +101,84 @@ static int copy(byte[] dest, int pos, String toCopy) {
if (replyTo != null) {
this.replyTo = replyTo;
}
this.sizeInBytes = protocolLength + 2;
this.protocolLength = protocolLength;
this.data = null; // will set data and size after we read it
}

private static int fastUtf8Length(CharSequence cs) {
return cs.length() + cs
.codePoints()
.filter(cp -> cp>0x7f)
.map(cp -> cp<=0x7ff? 1: 2)
.sum();
}

private static int fastIntLength(int number) {
if (number < 100000) {
if (number < 100) {
if (number < 10) {
return 1;
} else {
return 2;
}
} else {
if (number < 1000) {
return 3;
} else {
if (number < 10000) {
return 4;
} else {
return 5;
}
}
}
} else {
if (number < 10000000) {
if (number < 1000000) {
return 6;
} else {
return 7;
}
} else {
if (number < 100000000) {
return 8;
} else {
if (number < 1000000000) {
return 9;
} else {
return 10;
}
}
}
}
}

boolean isProtocol() {
return this.subject == null;
}

// Will be null on an incoming message
byte[] getProtocolBytes() {
return this.protocolBytes;
return this.protocolBytes.array();
}

int getControlLineLength() {
return (this.protocolBytes != null) ? this.protocolBytes.length + 2 : -1;
return (this.protocolBytes != null) ? this.protocolBytes.limit() + 2 : -1;
}

long getSizeInBytes() {
long sizeInBytes = 0;
if (this.protocolBytes != null) {
sizeInBytes += this.protocolBytes.limit();
}
if (this.protocolLength != null){
sizeInBytes += this.protocolLength;
}
if (data != null) {
sizeInBytes += data.limit() + 4;// for 2x \r\n
} else {
sizeInBytes += 2;
}
return sizeInBytes;
}

Expand All @@ -160,8 +188,7 @@ public String getSID() {

// Only for incoming messages, with no protocol bytes
void setData(byte[] data) {
this.data = data;
this.sizeInBytes += data.length + 2;// for \r\n, we already set the length for the protocol bytes in the constructor
this.data = ByteBuffer.wrap(data);
}

void setSubscription(NatsSubscription sub) {
Expand Down Expand Up @@ -189,7 +216,7 @@ public String getReplyTo() {
}

public byte[] getData() {
return this.data;
return this.data.array();
}

public Subscription getSubscription() {
Expand Down

0 comments on commit 7c3e96e

Please sign in to comment.