Skip to content

Commit

Permalink
ARTEMIS-4455 - Improve message redistribution balance for OFF_WITH_RE…
Browse files Browse the repository at this point in the history
…DISTRIBUTION
  • Loading branch information
AntonRoskvist committed Oct 13, 2023
1 parent 42be518 commit 067eaa0
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ public Message redistribute(final Message message,

final int bindingsCount = bindings.length;

int nextPosition = bindingIndex.getIndex();
int nextPosition = bindingIndex.getRedistributorIndex();

if (nextPosition >= bindingsCount) {
nextPosition = 0;
Expand Down Expand Up @@ -294,7 +294,7 @@ public Message redistribute(final Message message,
context.setTransaction(new TransactionImpl(storageManager));
}

bindingIndex.setIndex(nextPosition);
bindingIndex.setRedistributorIndex(nextPosition);
nextBinding.route(copyRedistribute, context);
logger.debug("Redistribution successful on message={}, towards bindings={}", message, bindings);
return copyRedistribute;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,61 @@ public interface BindingIndex {
*/
int getIndex();

/**
* Cannot return a negative value and returns {@code 0} if uninitialized.
*/
int getRedistributorIndex();

/**
* Cannot set a negative value.
*/
void setIndex(int v);

/**
* Cannot set a negative value.
*/
void setRedistributorIndex(int v);
}

private static final class BindingsAndPosition extends AtomicReference<Binding[]> implements BindingIndex {

private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextPosition");
private static final AtomicIntegerFieldUpdater<BindingsAndPosition> NEXT_REDISTRIBUTION_POSITION_UPDATER = AtomicIntegerFieldUpdater.newUpdater(BindingsAndPosition.class, "nextRedistributionPosition");

public volatile int nextPosition;
public volatile int nextRedistributionPosition;

BindingsAndPosition(Binding[] bindings) {
super(bindings);
NEXT_POSITION_UPDATER.lazySet(this, 0);
NEXT_REDISTRIBUTION_POSITION_UPDATER.lazySet(this, 0);
}

@Override
public int getIndex() {
return nextPosition;
}

@Override
public int getRedistributorIndex() {
return nextRedistributionPosition;
}

@Override
public void setIndex(int v) {
if (v < 0) {
throw new IllegalArgumentException("cannot set a negative position");
}
NEXT_POSITION_UPDATER.lazySet(this, v);
}

@Override
public void setRedistributorIndex(int v) {
if (v < 0) {
throw new IllegalArgumentException("cannot set a negative position");
}
NEXT_REDISTRIBUTION_POSITION_UPDATER.lazySet(this, v);
}
}

private final ConcurrentHashMap<SimpleString, BindingsAndPosition> map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,40 @@ public void testRedistributionToRemoteConsumerFromNewQueueLbOffWithRedistributio

}

@Test
public void testEvenRedistributionLbOffWithRedistribution() throws Exception {
final int messageCount = 1000;
final String queue = "queues.test";

setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
startServers(0, 1, 2);

setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());

createQueue(0, queue, queue, null, false, RoutingType.ANYCAST);
createQueue(1, queue, queue, null, false, RoutingType.ANYCAST);
createQueue(2, queue, queue, null, false, RoutingType.ANYCAST);

addConsumer(0, 1, queue, null);
addConsumer(1, 2, queue, null);

waitForBindings(0, queue, 1, 0, true);
waitForBindings(1, queue, 1, 1, true);
waitForBindings(2, queue, 1, 1, true);

waitForBindings(0, queue, 2, 2, false);
waitForBindings(1, queue, 2, 1, false);
waitForBindings(2, queue, 2, 1, false);

send(0, queue, messageCount * 2, false, null);

Wait.assertEquals(0L, () -> servers[0].getTotalMessageCount(), 5000, 100);
Assert.assertEquals(messageCount, servers[1].getTotalMessageCount());
Assert.assertEquals(messageCount, servers[2].getTotalMessageCount());
}

@Test
public void testRedistributionToRemoteMulticastConsumerLbOffWithRedistribution() throws Exception {

Expand Down

0 comments on commit 067eaa0

Please sign in to comment.