Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor NatsMessage to use ByteBuffer internally. #349

Merged
merged 1 commit into from
Oct 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 125 additions & 85 deletions src/main/java/io/nats/client/impl/NatsMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;

import io.nats.client.Connection;
import io.nats.client.Message;
Expand All @@ -26,103 +26,74 @@ class NatsMessage implements Message {
private String sid;
private String subject;
private String replyTo;
private byte[] data;
private byte[] protocolBytes;
private ByteBuffer data;
private ByteBuffer protocolBytes;
private NatsSubscription subscription;
private long sizeInBytes;

NatsMessage next; // for linked list

static final byte[] digits = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'};
Copy link
Contributor Author

@jameshilliard jameshilliard Oct 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't needed, should be faster to just add 0x30 to convert a byte to ascii.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add to the PR, should be ok - although i might - since i am paranoid - convert 0 to a byte to get the value :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ASCII "0" to byte gives you 0x30, I'm doing this in integer mode then casting to byte when writing to the buffer, at least I think that's probably the most efficient way to do it.


static int copy(byte[] dest, int pos, String toCopy) {
for (int i=0, max=toCopy.length(); i<max ;i++) {
dest[pos] = (byte) toCopy.charAt(i);
pos++;
}
private Integer protocolLength = null;

return pos;
}

private static String PUB_SPACE = NatsConnection.OP_PUB + " ";
private static String SPACE = " ";
NatsMessage next; // for linked list

// Create a message to publish
NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
NatsMessage(String subject, String replyTo, ByteBuffer data, boolean utf8mode) {
this.subject = subject;
this.replyTo = replyTo;
this.data = data;

if (utf8mode) {
int subjectSize = subject.length() * 2;
int replySize = (replyTo != null) ? replyTo.length() * 2 : 0;
StringBuilder protocolStringBuilder = new StringBuilder(4 + subjectSize + 1 + replySize + 1);
protocolStringBuilder.append(PUB_SPACE);
protocolStringBuilder.append(subject);
protocolStringBuilder.append(SPACE);

if (replyTo != null) {
protocolStringBuilder.append(replyTo);
protocolStringBuilder.append(SPACE);
}

protocolStringBuilder.append(String.valueOf(data.length));
Charset charset;

this.protocolBytes = protocolStringBuilder.toString().getBytes(StandardCharsets.UTF_8);
} else {
// Convert the length to bytes
byte[] lengthBytes = new byte[12];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to write this directly to protocolBytes.

int idx = lengthBytes.length;
int size = (data != null) ? data.length : 0;

if (size > 0) {
for (int i = size; i > 0; i /= 10) {
idx--;
lengthBytes[idx] = digits[i % 10];
}
// Calculate the length in bytes
int size = (data != null) ? data.limit() : 0;
int len = fastIntLength(size) + 4;
if (replyTo != null) {
if (utf8mode) {
len += fastUtf8Length(replyTo) + 1;
} else {
idx--;
lengthBytes[idx] = digits[0];
}

// Build the array
int len = 4 + subject.length() + 1 + (lengthBytes.length - idx);

if (replyTo != null) {
len += replyTo.length() + 1;
}
}
if (utf8mode) {
len += fastUtf8Length(subject) + 1;
charset = StandardCharsets.UTF_8;
} else {
len += subject.length() + 1;
charset = StandardCharsets.US_ASCII;
}
this.protocolBytes = ByteBuffer.allocate(len);
protocolBytes.put((byte)'P').put((byte)'U').put((byte)'B').put((byte)' ');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be faster to use a backing array, initialized with PUB vs put APIs? Not sure which is faster in java...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall this was faster, and it will have better interoperability with the next round of changes which use ByteBuffer's in more places, ByteBuffer should be optimized for these types of operations.

protocolBytes.put(subject.getBytes(charset));
protocolBytes.put((byte)' ');

this.protocolBytes = new byte[len];

// Copy everything
int pos = 0;
protocolBytes[0] = 'P';
protocolBytes[1] = 'U';
protocolBytes[2] = 'B';
protocolBytes[3] = ' ';
pos = 4;
pos = copy(protocolBytes, pos, subject);
protocolBytes[pos] = ' ';
pos++;

if (replyTo != null) {
pos = copy(protocolBytes, pos, replyTo);
protocolBytes[pos] = ' ';
pos++;
}
if (replyTo != null) {
protocolBytes.put(replyTo.getBytes(charset));
protocolBytes.put((byte)' ');
}

System.arraycopy(lengthBytes, idx, protocolBytes, pos, lengthBytes.length - idx);
if (size > 0) {
int base = protocolBytes.limit();
for (int i = size; i > 0; i /= 10) {
base--;
protocolBytes.put(base, (byte)(i % 10 + (byte)'0'));
}
} else {
protocolBytes.put((byte)'0');
}
protocolBytes.clear();
}

this.sizeInBytes = this.protocolBytes.length + data.length + 4;// for 2x \r\n
NatsMessage(String subject, String replyTo, byte[] data, boolean utf8mode) {
this(subject, replyTo, ByteBuffer.wrap(data), utf8mode);
}

// Create a protocol only message to publish
NatsMessage(CharBuffer protocol) {
ByteBuffer byteBuffer = StandardCharsets.UTF_8.encode(protocol);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not allocate a correctly sized ByteBuffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this sort of issue is one of the reasons I'm trying to move away from any low level char/string based protocol encoding/decoding to UTF-8/ASCII byte/bytebuffer based encoding/decoding as much as possible.

this.protocolBytes = Arrays.copyOfRange(byteBuffer.array(), byteBuffer.position(), byteBuffer.limit());
Arrays.fill(byteBuffer.array(), (byte) 0); // clear sensitive data
this.sizeInBytes = this.protocolBytes.length + 2;// for \r\n
if (protocol.remaining() == 0) {
this.protocolBytes = ByteBuffer.allocate(0);
} else {
protocol.mark();
this.protocolBytes = ByteBuffer.allocate(fastUtf8Length(protocol));
protocol.reset();
StandardCharsets.UTF_8.newEncoder().encode(protocol, this.protocolBytes, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use a static instance rather than allocating every new msg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recall I tested that and this was a good bit faster in benchmarks, not 100% sure why, best guess is having multiple threads accessing the same static encoder instance causes cache flushing or something. I plan to refactor this anyways down the line once I refactor some prerequisite code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, verified this has a major performance impact.
Static instance:

5:59:11 PM: Executing task 'MessageProtocolCreationBenchmark.main()'...

> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE
> Task :compileTestJava UP-TO-DATE
> Task :processTestResources UP-TO-DATE
> Task :testClasses UP-TO-DATE

> Task :MessageProtocolCreationBenchmark.main()
### Running benchmarks with 50,000,000 messages.

### Total time to create 50,000,000 non-utf8 messages for sending was 1,835 ms
	36.718829 ns/op
	27,233,984.034 op/sec

### Total time to create 50,000,000 utf8 messages for sending was 3,552 ms
	71.053063 ns/op
	14,073,988.718 op/sec

### Total time to create 50,000,000 a protocol message was 341 ms
	6.833834 ns/op
	146,330,744.581 op/sec

BUILD SUCCESSFUL in 6s
4 actionable tasks: 1 executed, 3 up-to-date
5:59:17 PM: Task execution finished 'MessageProtocolCreationBenchmark.main()'.

non-static(current) implementation:

5:58:33 PM: Executing task 'MessageProtocolCreationBenchmark.main()'...


> Task :compileJava

> Task :processResources NO-SOURCE
> Task :classes
> Task :compileTestJava UP-TO-DATE
> Task :processTestResources UP-TO-DATE
> Task :testClasses UP-TO-DATE
Note: Some input files use or override a deprecated API.
Note: Recompile with -Xlint:deprecation for details.

> Task :MessageProtocolCreationBenchmark.main()
### Running benchmarks with 50,000,000 messages.

### Total time to create 50,000,000 non-utf8 messages for sending was 1,802 ms
	36.054858 ns/op
	27,735,513.355 op/sec

### Total time to create 50,000,000 utf8 messages for sending was 2,841 ms
	56.833210 ns/op
	17,595,346.163 op/sec

### Total time to create 50,000,000 a protocol message was 334 ms
	6.693376 ns/op
	149,401,436.293 op/sec

BUILD SUCCESSFUL in 5s
4 actionable tasks: 2 executed, 2 up-to-date
5:58:39 PM: Task execution finished 'MessageProtocolCreationBenchmark.main()'.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, would have lost a bet on that one. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also noticed this here:

Instances of this class are not safe for use by multiple concurrent threads.

protocolBytes.clear();
}
}

// Create an incoming message for a subscriber
Expand All @@ -133,24 +104,94 @@ static int copy(byte[] dest, int pos, String toCopy) {
if (replyTo != null) {
this.replyTo = replyTo;
}
this.sizeInBytes = protocolLength + 2;
this.protocolLength = protocolLength;
this.data = null; // will set data and size after we read it
}

private static int fastUtf8Length(CharSequence cs) {
int count = 0;
for (int i = 0, len = cs.length(); i < len; i++) {
char ch = cs.charAt(i);
if (ch <= 0x7F) {
count++;
} else if (ch <= 0x7FF) {
count += 2;
} else if (Character.isHighSurrogate(ch)) {
count += 4;
++i;
} else {
count += 3;
}
}
return count;
}

private static int fastIntLength(int number) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation while a bit verbose is significantly faster than a naive implementation and allows us to eliminate a separate length buffer which lets us write directly to protocolBytes avoiding some somewhat expensive copy operations.

if (number < 100000) {
if (number < 100) {
if (number < 10) {
return 1;
} else {
return 2;
}
} else {
if (number < 1000) {
return 3;
} else {
if (number < 10000) {
return 4;
} else {
return 5;
}
}
}
} else {
if (number < 10000000) {
if (number < 1000000) {
return 6;
} else {
return 7;
}
} else {
if (number < 100000000) {
return 8;
} else {
if (number < 1000000000) {
return 9;
} else {
return 10;
}
}
}
}
}

boolean isProtocol() {
return this.subject == null;
}

// Will be null on an incoming message
byte[] getProtocolBytes() {
return this.protocolBytes;
return this.protocolBytes.array();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all ByteBuffer's now have correctly allocated sizes we can unconditionally use the fast path here.

}

int getControlLineLength() {
return (this.protocolBytes != null) ? this.protocolBytes.length + 2 : -1;
return (this.protocolBytes != null) ? this.protocolBytes.limit() + 2 : -1;
}

long getSizeInBytes() {
long sizeInBytes = 0;
if (this.protocolBytes != null) {
sizeInBytes += this.protocolBytes.limit();
}
if (this.protocolLength != null){
sizeInBytes += this.protocolLength;
}
if (data != null) {
sizeInBytes += data.limit() + 4;// for 2x \r\n
} else {
sizeInBytes += 2;
}
return sizeInBytes;
}

Expand All @@ -160,8 +201,7 @@ public String getSID() {

// Only for incoming messages, with no protocol bytes
void setData(byte[] data) {
this.data = data;
this.sizeInBytes += data.length + 2;// for \r\n, we already set the length for the protocol bytes in the constructor
this.data = ByteBuffer.wrap(data);
}

void setSubscription(NatsSubscription sub) {
Expand Down Expand Up @@ -189,7 +229,7 @@ public String getReplyTo() {
}

public byte[] getData() {
return this.data;
return this.data.array();
}

public Subscription getSubscription() {
Expand Down