Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate array copies #31

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/BrokerSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -1219,10 +1221,12 @@ public QueueHandle lookupQueue(QueueId queueId) {
return queueStateManager.findByQueueId(queueId);
}

public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException {
public void post(QueueHandle queueHandle, Collection<PutMessageImpl> msgs) throws BMQException {
Argument.expectNonNull(queueHandle, "queueHandle");
Argument.expectNonNull(msgs, "msgs");
Argument.expectPositive(msgs.length, "message array length");
if (msgs.isEmpty()) {
return;
}

// Queue state guard
QueueState state = queueHandle.getState();
Expand All @@ -1249,6 +1253,14 @@ public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQExce
}
}

public void post(QueueHandle queueHandle, PutMessageImpl msg) throws BMQException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need there there two methods here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.singletonList() is more efficient for a single element than Arrays.asList()

post(queueHandle, Collections.singletonList(msg));
}

public void post(QueueHandle queueHandle, PutMessageImpl... msgs) throws BMQException {
post(queueHandle, Arrays.asList(msgs));
}

public GenericResult confirm(QueueHandle queueHandle, PushMessageImpl... messages) {
Argument.expectNonNull(queueHandle, "queueHandle");
Argument.expectNonNull(messages, "messages");
Expand Down
16 changes: 13 additions & 3 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/PutPoster.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -68,9 +70,9 @@ void setMaxEventSize(int val) {
maxEventSize = Argument.expectNotGreater(val, EventHeader.MAX_SIZE_SOFT, "max event size");
}

public void pack(PutMessageImpl... msgs) {
public void pack(Collection<PutMessageImpl> msgs) {
Argument.expectNonNull(msgs, "msgs");
Argument.expectPositive(msgs.length, "message array length");
Argument.expectPositive(msgs.size(), "message array length");
for (PutMessageImpl m : msgs) {
Argument.expectNonNull(m, "put message");

Expand All @@ -87,11 +89,19 @@ public void flush() {
}
}

public void post(PutMessageImpl... msgs) {
public void post(Collection<PutMessageImpl> msgs) {
pack(msgs);
flush();
}

public void post(PutMessageImpl msg) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here

post(Collections.singletonList(msg));
}

public void post(PutMessageImpl... msgs) {
post(Arrays.asList(msgs));
}

private void sendEvent() {
PutEventBuilder putBuilder = new PutEventBuilder();
putBuilder.setMaxEventSize(maxEventSize);
Expand Down
21 changes: 8 additions & 13 deletions bmq-sdk/src/main/java/com/bloomberg/bmq/impl/QueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +50,7 @@ public class QueueImpl implements QueueHandle {
static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

static final int INVALID_QUEUE_ID = -1;
private static final int INITIAL_PUTMESSAGES_SIZE = 100;

// Immutable fields
private final BrokerSession brokerSession;
Expand All @@ -61,7 +64,8 @@ public class QueueImpl implements QueueHandle {
// Fields exposed to user thread
private final QueueHandleParameters parameters; // mutable object and final field
private volatile QueueState state;
private final ArrayList<PutMessageImpl> putMessages = new ArrayList<>();
private final AtomicReference<Collection<PutMessageImpl>> putMessages =
new AtomicReference<>(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE));
private volatile boolean isSuspended = false;
// Whether the queue is suspended.
// While suspended, a queue receives no
Expand Down Expand Up @@ -261,20 +265,11 @@ public BmqFuture<CloseQueueCode> closeAsync(Duration timeout) {
}

public void pack(PutMessageImpl message) throws BMQException {
synchronized (lock) {
putMessages.add(message);
}
putMessages.get().add(message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose .get().add() is not an atomic operation, so it's possible get the array which is being flushed and add new messages afterward and eventually lost them or modify the collection while it is being iterated in PutPoster?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get() itself is atomic, and so is the .getAndSet() in flush, so afaik I don't think its possible to get the array that is being flushed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, get() and getAndSet() are atomic, but the following sequence seems to be possible:
[t1] .get() returns an array to flush
[t2] .getAndSet() returns the same array
[t2] is iterating the array and processing the messages
[t1] calls .add() which modifies the array

Correct?

}

public PutMessageImpl[] flush() throws BMQException {
PutMessageImpl[] msgs;
synchronized (lock) {
msgs = new PutMessageImpl[putMessages.size()];
msgs = putMessages.toArray(msgs);
putMessages.clear();
}
brokerSession.post(this, msgs);
return msgs;
public void flush() throws BMQException {
brokerSession.post(this, putMessages.getAndSet(new ArrayList<>(INITIAL_PUTMESSAGES_SIZE)));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not we check here if there are messages to send? Otherwise we may just replace empty array with another empty array

}

@Override
Expand Down
Loading