Skip to content

Commit

Permalink
Ordered Consumer Heartbeat Handling (#766)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 8, 2022
1 parent 721801e commit ea3c532
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 295 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/SubscribeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected SubscribeOptions(Builder builder, boolean isPull, boolean isOrdered, S
throw JsSoOrderedReplicasNotSuppliedOrOne.instance();
}
Duration ccHb = builder.cc.getIdleHeartbeat();
if (ccHb != null && ccHb.toMillis() > hb) {
if (ccHb != null) {
hb = ccHb.toMillis();
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,20 @@ void processError(String errorText) {
}
}

interface ErrorListenerCaller {
void call(Connection conn, ErrorListener el);
}

void executeCallback(ErrorListenerCaller elc) {
if (!this.callbackRunner.isShutdown()) {
try {
this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener()));
} catch (RejectedExecutionException re) {
// Timing with shutdown, let it go
}
}
}

void processConnectionEvent(Events type) {
ConnectionListener handler = this.options.getConnectionListener();

Expand Down
59 changes: 24 additions & 35 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,24 +198,23 @@ private Headers _mergeNum(Headers h, String key, String value) {
// ----------------------------------------------------------------------------------------------------
private static final PushSubscribeOptions DEFAULT_PUSH_OPTS = PushSubscribeOptions.builder().build();

static class SidCheckManager extends MessageManager {
@Override
boolean manage(Message msg) {
return !sub.getSID().equals(msg.getSID());
}
}

// Push/PullMessageManagerFactory are internal and used for testing / providing a MessageManager mocks
interface PushMessageManagerFactory {
MessageManager createPushMessageManager(
NatsConnection conn, SubscribeOptions so, ConsumerConfiguration cc, boolean queueMode, boolean syncMode);
NatsConnection conn,
NatsJetStream js,
String stream,
SubscribeOptions so,
ConsumerConfiguration serverCC,
boolean queueMode,
NatsDispatcher dispatcher);
}

interface PullMessageManagerFactory {
MessageManager createPullMessageManager();
}

PushMessageManagerFactory PUSH_MESSAGE_MANAGER_FACTORY = PushMessageManager::new;
PushMessageManagerFactory PUSH_MESSAGE_MANAGER_FACTORY = null;
PullMessageManagerFactory PULL_MESSAGE_MANAGER_FACTORY = PullMessageManager::new;

JetStreamSubscription createSubscription(String subject,
Expand Down Expand Up @@ -380,35 +379,32 @@ else if (so.isBind()) {
// 6. create the subscription. lambda needs final or effectively final vars
NatsJetStreamSubscription sub;
if (isPullMode) {
final MessageManager[] managers = new MessageManager[] { PULL_MESSAGE_MANAGER_FACTORY.createPullMessageManager() };
final MessageManager manager = PULL_MESSAGE_MANAGER_FACTORY.createPullMessageManager();
final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher)
-> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, settledConsumerName, managers);
-> new NatsJetStreamPullSubscription(sid, lSubject, lConn, this, fnlStream, settledConsumerName, manager);
sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory);
}
else {
final MessageManager pushMessageManager =
PUSH_MESSAGE_MANAGER_FACTORY.createPushMessageManager(conn, so, settledServerCC, qgroup != null, dispatcher == null);
final MessageManager[] managers;
if (so.isOrdered()) {
managers = new MessageManager[3];
managers[0] = new SidCheckManager();
managers[1] = pushMessageManager;
managers[2] = new OrderedManager(this, dispatcher, fnlStream, settledServerCC);
final MessageManager manager;
if (PUSH_MESSAGE_MANAGER_FACTORY != null) {
manager = PUSH_MESSAGE_MANAGER_FACTORY.createPushMessageManager(conn, this, fnlStream, so, settledServerCC, qgroup != null, dispatcher);
}
else if (so.isOrdered()) {
manager = new OrderedManager(conn, this, fnlStream, so, settledServerCC, qgroup != null, dispatcher);
}
else {
managers = new MessageManager[1];
managers[0] = pushMessageManager;
manager = new PushMessageManager(conn, this, fnlStream, so, settledServerCC, qgroup != null, dispatcher);
}

final NatsSubscriptionFactory factory = (sid, lSubject, lQgroup, lConn, lDispatcher)
-> new NatsJetStreamSubscription(sid, lSubject, lQgroup, lConn, lDispatcher,
this, fnlStream, settledConsumerName, managers);
this, fnlStream, settledConsumerName, manager);

if (dispatcher == null) {
sub = (NatsJetStreamSubscription) conn.createSubscription(fnlInboxDeliver, qgroup, null, factory);
}
else {
AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, settledServerCC, managers);
AsyncMessageHandler handler = new AsyncMessageHandler(userHandler, isAutoAck, settledServerCC, manager);
sub = (NatsJetStreamSubscription) dispatcher.subscribeImplJetStream(fnlInboxDeliver, qgroup, handler, factory);
}
}
Expand Down Expand Up @@ -471,30 +467,23 @@ public List<String> getChanges(ConsumerConfiguration serverCc) {
}

static class AsyncMessageHandler implements MessageHandler {
List<MessageManager> managers;
MessageManager manager;
List<MessageHandler> handlers;

public AsyncMessageHandler(MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration settledServerCC, MessageManager ... managers) {
public AsyncMessageHandler(MessageHandler userHandler, boolean isAutoAck, ConsumerConfiguration settledServerCC, MessageManager manager) {
handlers = new ArrayList<>();
handlers.add(userHandler);
if (isAutoAck && settledServerCC.getAckPolicy() != AckPolicy.None) {
handlers.add(Message::ack);
};

this.managers = new ArrayList<>();
for (MessageManager mm : managers) {
if (mm != null) {
this.managers.add(mm);
}
}
this.manager = manager;
}

@Override
public void onMessage(Message msg) throws InterruptedException {
for (MessageManager mm : managers) {
if (mm.manage(msg)) {
return;
}
if (manager.manage(msg)) {
return;
}

for (MessageHandler mh : handlers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class NatsJetStreamPullSubscription extends NatsJetStreamSubscription {
NatsConnection connection,
NatsJetStream js,
String stream, String consumer,
MessageManager[] managers) {
super(sid, subject, null, connection, null, js, stream, consumer, managers);
MessageManager manager) {
super(sid, subject, null, connection, null, js, stream, consumer, manager);
}

@Override
Expand Down Expand Up @@ -143,7 +143,7 @@ private List<Message> _fetch(int batchSize, long maxWaitMillis) {
if (msg == null) {
return messages; // normal timeout
}
if (!anyManaged(msg)) { // not null and not managed means JS Message
if (!manager.manage(msg)) { // not null and not managed means JS Message
messages.add(msg);
batchLeft--;
}
Expand All @@ -162,7 +162,7 @@ private List<Message> drainAlreadyBuffered(int batchSize) {
try {
Message msg = nextMessageInternal(null); // null means do not wait, it's either already here or not
while (msg != null) {
if (!anyManaged(msg)) { // not null and not managed means JS Message
if (!manager.manage(msg)) { // not null and not managed means JS Message
messages.add(msg);
if (messages.size() == batchSize) {
return messages;
Expand Down
29 changes: 8 additions & 21 deletions src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,22 @@ public class NatsJetStreamSubscription extends NatsSubscription implements JetSt
protected String stream;
protected String consumerName;

protected MessageManager[] managers;
protected MessageManager manager;

NatsJetStreamSubscription(String sid, String subject, String queueName,
NatsConnection connection, NatsDispatcher dispatcher,
NatsJetStream js,
String stream, String consumer,
MessageManager[] managers)
MessageManager manager)
{
super(sid, subject, queueName, connection, dispatcher);

this.js = js;
this.stream = stream;
this.consumerName = consumer; // might be null, someone will call setConsumerName

this.managers = managers;
for (MessageManager mm : managers) {
mm.startup(this);
}
this.manager = manager;
manager.startup(this);
}

void setConsumerName(String consumerName) {
Expand All @@ -70,13 +68,11 @@ boolean isPullMode() {
return false;
}

MessageManager[] getManagers() { return managers; } // internal, for testing
MessageManager getManager() { return manager; } // internal, for testing

@Override
void invalidate() {
for (MessageManager mm : managers) {
mm.shutdown();
}
manager.shutdown();
super.invalidate();
}

Expand All @@ -103,7 +99,7 @@ protected Message _nextUnmanagedNullOrLteZero(Duration timeout) throws Interrupt
// until we get an actual no (null) message or we get a message
// that the managers do not handle
Message msg = nextMessageInternal(timeout);
while (msg != null && anyManaged(msg)) {
while (msg != null && manager.manage(msg)) {
msg = nextMessageInternal(timeout);
}
return msg;
Expand All @@ -124,7 +120,7 @@ protected Message _nextUnmanaged(long timeout) throws InterruptedException {
if (msg == null) {
return null; // normal timeout
}
if (!anyManaged(msg)) { // not managed means JS Message
if (!manager.manage(msg)) { // not managed means JS Message
return msg;
}
// managed so try again while we have time
Expand All @@ -133,15 +129,6 @@ protected Message _nextUnmanaged(long timeout) throws InterruptedException {
return null;
}

boolean anyManaged(Message msg) {
for (MessageManager mm : managers) {
if (mm.manage(msg)) {
return true;
}
}
return false;
}

/**
* {@inheritDoc}
*/
Expand Down
97 changes: 45 additions & 52 deletions src/main/java/io/nats/client/impl/OrderedManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,73 +14,66 @@
package io.nats.client.impl;

import io.nats.client.Message;
import io.nats.client.SubscribeOptions;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;

class OrderedManager extends MessageManager {
class OrderedManager extends PushMessageManager {

private final NatsJetStream js;
private final NatsDispatcher dispatcher;
private final String stream;
private final ConsumerConfiguration serverCC;

private long lastStreamSeq;
private long expectedConsumerSeq;

OrderedManager(NatsJetStream js, NatsDispatcher dispatcher, String stream, ConsumerConfiguration serverCC) {
this.js = js;
this.dispatcher = dispatcher;
this.stream = stream;
this.serverCC = serverCC;
lastStreamSeq = -1;
OrderedManager(NatsConnection conn, NatsJetStream js, String stream, SubscribeOptions so, ConsumerConfiguration serverCC, boolean queueMode, NatsDispatcher dispatcher) {
super(conn, js, stream, so, serverCC, queueMode, dispatcher);
expectedConsumerSeq = 1; // always starts at 1
}

@Override
boolean manage(Message msg) {
if (msg != null) {
long receivedConsumerSeq = msg.metaData().consumerSequence();
if (expectedConsumerSeq != receivedConsumerSeq) {
try {
expectedConsumerSeq = 1; // consumer always starts with consumer sequence 1
protected boolean subManage(Message msg) {
long receivedConsumerSeq = msg.metaData().consumerSequence();
if (expectedConsumerSeq != receivedConsumerSeq) {
handleErrorCondition();
return true;
}
expectedConsumerSeq++;
return false;
}

@Override
protected void handleHeartbeatError() {
handleErrorCondition();
}

private void handleErrorCondition() {
try {
expectedConsumerSeq = 1; // consumer always starts with consumer sequence 1

// 1. shutdown the managers, for instance stops heartbeat timers
for (MessageManager mm : sub.managers) {
mm.shutdown();
}
// 1. shutdown the manager, for instance stops heartbeat timers
sub.manager.shutdown();

// 2. re-subscribe. This means kill the sub then make a new one
// New sub needs a new deliver subject
String newDeliverSubject = sub.connection.createInbox();
sub.reSubscribe(newDeliverSubject);
// 2. re-subscribe. This means kill the sub then make a new one
// New sub needs a new deliverSubject
String newDeliverSubject = sub.connection.createInbox();
sub.reSubscribe(newDeliverSubject);

// 3. make a new consumer using the same deliver subject but
// with a new starting point
ConsumerConfiguration userCC = ConsumerConfiguration.builder(serverCC)
.deliverPolicy(DeliverPolicy.ByStartSequence)
.deliverSubject(newDeliverSubject)
.startSequence(lastStreamSeq + 1)
.startTime(null) // clear start time in case it was originally set
.build();
js._createConsumerUnsubscribeOnException(stream, userCC, sub);
// 3. make a new consumer using the same deliver subject but
// with a new starting point
ConsumerConfiguration userCC = ConsumerConfiguration.builder(serverCC)
.deliverPolicy(DeliverPolicy.ByStartSequence)
.deliverSubject(newDeliverSubject)
.startSequence(Math.max(1, lastStreamSeq + 1))
.startTime(null) // clear start time in case it was originally set
.build();
js._createConsumerUnsubscribeOnException(stream, userCC, sub);

// 4. re start the managers.
for (MessageManager mm : sub.managers) {
mm.startup(sub);
}
}
catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Ordered subscription fatal error.", e);
js.conn.processException(ise);
if (dispatcher == null) { // synchronous
throw ise;
}
}
return true;
// 4. restart the manager.
sub.manager.startup(sub);
}
catch (Exception e) {
IllegalStateException ise = new IllegalStateException("Ordered subscription fatal error.", e);
js.conn.processException(ise);
if (dispatcher == null) { // synchronous
throw ise;
}
lastStreamSeq = msg.metaData().streamSequence();
expectedConsumerSeq++;
}
return false;
}
}
Loading

0 comments on commit ea3c532

Please sign in to comment.