Skip to content

Commit

Permalink
Added simplification consuming to Multi Tool
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 8, 2024
1 parent b87c863 commit 69f7042
Show file tree
Hide file tree
Showing 4 changed files with 210 additions and 97 deletions.
163 changes: 112 additions & 51 deletions js-multi-tool/src/main/java/io/nats/jsmulti/JsMulti.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static io.nats.jsmulti.shared.Utils.*;

Expand Down Expand Up @@ -114,38 +115,48 @@ private static void cleanupConsumers(Context ctx) {

private static ActionRunner getRunner(final Context ctx) {
try {
switch (ctx.action) {
case CUSTOM: return ctx.customActionRunner;

case PUB_SYNC: return JsMulti::pubSync;
case PUB_ASYNC: return JsMulti::pubAsync;
case PUB_CORE: return JsMulti::pubCore;
case PUB: return JsMulti::pub;

case RTT: return JsMulti::rtt;

case REQUEST: return JsMulti::request;
case REPLY: return JsMulti::reply;

case SUB_CORE: return JsMulti::subCore;
case SUB_PUSH: return JsMulti::subPush;

case SUB_PULL:
case SUB_PULL_READ:
return JsMulti::subPull;

case SUB_QUEUE:
if (ctx.threads > 1) {
if (!ctx.action.isQueue() || ctx.threads > 1) {

switch (ctx.action) {
case CUSTOM:
return ctx.customActionRunner;

case PUB_SYNC:
return JsMulti::pubSync;
case PUB_ASYNC:
return JsMulti::pubAsync;
case PUB_CORE:
return JsMulti::pubCore;
case PUB:
return JsMulti::pub;

case RTT:
return JsMulti::rtt;

case REQUEST:
return JsMulti::request;
case REPLY:
return JsMulti::reply;

case SUB_CORE:
return JsMulti::subCore;

case SUB_PUSH:
case SUB_QUEUE:
return JsMulti::subPush;
}
break;

case SUB_PULL_QUEUE:
case SUB_PULL_READ_QUEUE:
if (ctx.threads > 1) {
case SUB_PULL:
case SUB_PULL_READ:
case SUB_PULL_QUEUE:
case SUB_PULL_READ_QUEUE:
return JsMulti::subPull;
}
break;

case SUB_ITERATE:
case SUB_FETCH:
case SUB_ITERATE_QUEUE:
case SUB_FETCH_QUEUE:
return JsMulti::subSimple;
}
}
throw new Exception("Invalid Action");
}
Expand Down Expand Up @@ -336,52 +347,59 @@ private static void processFutures(List<CompletableFuture<PublishAck>> futures,
stats.stop();
}

// ----------------------------------------------------------------------------------------------------
// SyncConsumer - Used when consuming sync where you have to call a "next"
// ----------------------------------------------------------------------------------------------------
interface SyncConsumer {
Message next() throws Exception;
}

// ----------------------------------------------------------------------------------------------------
// Push
// ----------------------------------------------------------------------------------------------------
private static final Object QUEUE_LOCK = new Object();
private static final Object CREATE_CONSUMER_LOCK = new Object();

private static void reply(Context ctx, Connection nc, Stats stats, int id) throws Exception {
Subscription sub;
if (ctx.action.isQueue()) {
// if we don't do this, multiple threads will try to make the same consumer because
// when they start, the consumer does not exist. So force them do it one at a time.
synchronized (QUEUE_LOCK) {
synchronized (CREATE_CONSUMER_LOCK) {
sub = nc.subscribe(ctx.subject, ctx.queueName);
}
}
else {
sub = nc.subscribe(ctx.subject);
}

_coreReadLikePush(ctx, stats, ctx.getSubName(id), sub::nextMessage, m -> nc.publish(m.getReplyTo(), m.getData()));
_coreReadLikePush(ctx, stats, ctx.getSubName(id), () -> sub.nextMessage(ctx.readTimeoutDuration), m -> nc.publish(m.getReplyTo(), m.getData()));
}

private static void subCore(Context ctx, Connection nc, Stats stats, int id) throws Exception {
Subscription sub;
if (ctx.action.isQueue()) {
// if we don't do this, multiple threads will try to make the same consumer because
// when they start, the consumer does not exist. So force them do it one at a time.
synchronized (QUEUE_LOCK) {
synchronized (CREATE_CONSUMER_LOCK) {
sub = nc.subscribe(ctx.subject, ctx.queueName);
}
}
else {
sub = nc.subscribe(ctx.subject);
}

_coreReadLikePush(ctx, stats, ctx.getSubName(id), sub::nextMessage, m -> {});
_coreReadLikePush(ctx, stats, ctx.getSubName(id), () -> sub.nextMessage(ctx.readTimeoutDuration), m -> {});
}

private static void _coreReadLikePush(Context ctx, Stats stats, String subName, SimpleReader reader, ResultHandler<Message> rh) throws InterruptedException {
private static void _coreReadLikePush(Context ctx, Stats stats, String subName, SyncConsumer syncConsumer, ResultHandler<Message> rh) throws Exception {
int rcvd = 0;
int unReported = 0;
long noMessageTotalElapsed = 0;
AtomicLong counter = ctx.getSubscribeCounter(subName);
report(ctx, rcvd, "Begin Reading");
while (counter.get() < ctx.messageCount) {
stats.start();
Message m = reader.nextMessage(ctx.readTimeoutDuration);
Message m = syncConsumer.next();
long hold = stats.elapsed();
long received = System.currentTimeMillis();
if (m == null) {
Expand Down Expand Up @@ -411,7 +429,7 @@ private static void subPush(Context ctx, Connection nc, Stats stats, int id) thr
if (ctx.action.isQueue()) {
// if we don't do this, multiple threads will try to make the same consumer because
// when they start, the consumer does not exist. So force them do it one at a time.
synchronized (QUEUE_LOCK) {
synchronized (CREATE_CONSUMER_LOCK) {
sub = js.subscribe(ctx.subject, ctx.queueName,
ConsumerConfiguration.builder()
.ackPolicy(ctx.ackPolicy)
Expand All @@ -430,17 +448,10 @@ private static void subPush(Context ctx, Connection nc, Stats stats, int id) thr
.buildPushSubscribeOptions());
}

_jsReadLikePush(ctx, stats, durable, sub::nextMessage);
}

// ----------------------------------------------------------------------------------------------------
// Simple Reader - Used by push and pull reader
// ----------------------------------------------------------------------------------------------------
interface SimpleReader {
Message nextMessage(Duration timeout) throws InterruptedException, IllegalStateException;
_jsSyncConsume(ctx, stats, durable, () -> sub.nextMessage(ctx.readTimeoutDuration));
}

private static void _jsReadLikePush(Context ctx, Stats stats, String durable, SimpleReader reader) throws InterruptedException {
private static void _jsSyncConsume(Context ctx, Stats stats, String durable, SyncConsumer syncConsumer) throws Exception {
int rcvd = 0;
Message lastUnAcked = null;
int unAckedCount = 0;
Expand All @@ -450,7 +461,7 @@ private static void _jsReadLikePush(Context ctx, Stats stats, String durable, Si
report(ctx, rcvd, "Begin Reading");
while (counter.get() < ctx.messageCount) {
stats.start();
Message m = reader.nextMessage(ctx.readTimeoutDuration);
Message m = syncConsumer.next();
long hold = stats.elapsed();
long received = System.currentTimeMillis();
if (m == null) {
Expand Down Expand Up @@ -478,6 +489,53 @@ private static void _jsReadLikePush(Context ctx, Stats stats, String durable, Si
report(ctx, rcvd, "Finished Reading Messages");
}

// ----------------------------------------------------------------------------------------------------
// Simplification
// ----------------------------------------------------------------------------------------------------
private static void subSimple(Context ctx, Connection nc, Stats stats, int id) throws Exception {
// Really only need to lock when queueing b/c it's the same durable...
// ... to ensure protection from multiple threads trying to make the same consumer
String durable = ctx.getSubDurable(id);
StreamContext streamContext = nc.getStreamContext(ctx.stream);
ConsumerContext cc;
synchronized (CREATE_CONSUMER_LOCK) {
cc = streamContext.createOrUpdateConsumer(
ConsumerConfiguration.builder()
.ackPolicy(ctx.ackPolicy)
.ackWait(Duration.ofSeconds(ctx.ackWaitSeconds))
.durable(durable)
.build());
}

if (ctx.action == Action.SUB_FETCH || ctx.action == Action.SUB_FETCH_QUEUE) {
FetchConsumeOptions opts = FetchConsumeOptions.builder().maxMessages(ctx.batchSize).build();
AtomicReference<FetchConsumer> fcRef = new AtomicReference<>();
_jsSyncConsume(ctx, stats, durable, () -> {
if (fcRef.get() == null) {
FetchConsumer fc = cc.fetch(opts);
fcRef.set(fc);
}
Message m = fcRef.get().nextMessage();
if (m == null) {
fcRef.set(null);
}
return m;
});
}
else if (ctx.action == Action.SUB_ITERATE || ctx.action == Action.SUB_ITERATE_QUEUE) {
ConsumeOptions opts = ConsumeOptions.builder().batchSize(ctx.batchSize).build();
try (IterableConsumer ic = cc.iterate(opts)) {
_jsSyncConsume(ctx, stats, durable, () -> ic.nextMessage(ctx.readTimeoutDuration));
}
}
// else if (ctx.action == Action.SUB_CONSUME) {
// TODO
// }
else {
throw new Exception("Action Not Implemented: " + ctx.action.getLabel());
}
}

// ----------------------------------------------------------------------------------------------------
// Pull
// ----------------------------------------------------------------------------------------------------
Expand All @@ -488,7 +546,7 @@ private static void subPull(Context ctx, Connection nc, Stats stats, int id) thr
// ... to ensure protection from multiple threads trying to make the same consumer
String durable = ctx.getSubDurable(id);
JetStreamSubscription sub;
synchronized (QUEUE_LOCK) {
synchronized (CREATE_CONSUMER_LOCK) {
sub = js.subscribe(ctx.subject,
ConsumerConfiguration.builder()
.ackPolicy(ctx.ackPolicy)
Expand All @@ -500,9 +558,12 @@ private static void subPull(Context ctx, Connection nc, Stats stats, int id) thr
if (ctx.action == Action.SUB_PULL || ctx.action == Action.SUB_PULL_QUEUE) {
_subPullFetch(ctx, stats, sub, durable);
}
else {
else if (ctx.action == Action.SUB_PULL_READ) {
_subPullRead(ctx, stats, sub, durable);
}
else {
throw new Exception("Action Not Implemented: " + ctx.action.getLabel());
}
}

private static void _subPullFetch(Context ctx, Stats stats, JetStreamSubscription sub, String durable) {
Expand Down Expand Up @@ -537,9 +598,9 @@ private static void _subPullFetch(Context ctx, Stats stats, JetStreamSubscriptio
report(ctx, rcvd, "Finished Reading Messages");
}

private static void _subPullRead(Context ctx, Stats stats, JetStreamSubscription sub, String durable) throws InterruptedException {
private static void _subPullRead(Context ctx, Stats stats, JetStreamSubscription sub, String durable) throws Exception {
JetStreamReader reader = sub.reader(ctx.batchSize, ctx.batchSize / 4); // repullAt 25% of batch size
_jsReadLikePush(ctx, stats, durable, reader::nextMessage);
_jsSyncConsume(ctx, stats, durable, () -> reader.nextMessage(ctx.readTimeoutDuration));
reader.stop();
}

Expand Down
48 changes: 31 additions & 17 deletions js-multi-tool/src/main/java/io/nats/jsmulti/settings/Action.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,35 @@
package io.nats.jsmulti.settings;

public enum Action {
PUB_SYNC( "PubSync", true, true, false, false, false),
PUB_ASYNC( "PubAsync", true, false, false, false, false),
PUB_CORE( "PubCore", true, true, false, false, false),
PUB( "Pub", true, true, true, false, false),
PUB_SYNC( "PubSync", true, true, false, false, false, false),
PUB_ASYNC( "PubAsync", true, false, false, false, false, false),
PUB_CORE( "PubCore", true, true, false, false, false, false),
PUB( "Pub", true, true, true, false, false, false),

REQUEST( "Request", true, true, true, false, false),
REQUEST( "Request", true, true, true, false, false, false),
// REQUEST_ASYNC( "RequestAsync", true, false, true, false, false),
REPLY( "Reply", false, false, true, true, false),
REPLY( "Reply", false, false, true, true, false, false),

SUB_CORE( "SubCore", false, false, true, true, false),
SUB_CORE_QUEUE( "SubCoreQueue", false, false, true, true, true),
SUB_CORE( "SubCore", false, false, true, true, false, false),
SUB_CORE_QUEUE( "SubCoreQueue", false, false, true, true, true, false),

SUB_PUSH( "SubPush", false, false, false, true, false),
SUB_QUEUE( "SubQueue", false, false, false, true, true),
SUB_PULL( "SubPull", false, false, false, false, false),
SUB_PULL_READ( "SubPullRead", false, false, false, false, false),
SUB_PULL_QUEUE( "SubPullQueue", false, false, false, false, true),
SUB_PULL_READ_QUEUE( "SubPullReadQueue", false, false, false, false, false),
SUB_PUSH( "SubPush", false, false, false, true, false, false),
SUB_QUEUE( "SubQueue", false, false, false, true, true, false),
SUB_PULL( "SubPull", false, false, false, false, false, false),
SUB_PULL_READ( "SubPullRead", false, false, false, false, false, false), // read is manual continuous sync pull (pre-simplification)
SUB_PULL_QUEUE( "SubPullQueue", false, false, false, false, true, false),
SUB_PULL_READ_QUEUE( "SubPullReadQueue", false, false, false, false, false, false), // read is manual continuous sync pull (pre-simplification)

RTT( "RTT", true, true, true, false, false),
CUSTOM( "CUSTOM", false, false, false, false, false);
// simplification
SUB_FETCH( "SubFetch", false, false, false, false, false, true),
SUB_ITERATE( "SubIterate", false, false, false, false, false, true),
// SUB_CONSUME( "SubConsume", false, false, false, false, false, true),
SUB_FETCH_QUEUE( "SubFetchQueue", false, false, false, false, true, true),
SUB_ITERATE_QUEUE( "SubIterateQueue", false, false, false, false, true, true),
// SUB_CONSUME_QUEUE( "SubConsumeQueue", false, false, false, false, true, true),

RTT( "RTT", true, true, true, false, false, false),
CUSTOM( "CUSTOM", false, false, false, false, false, false);

private final String label;
private final boolean pubAction;
Expand All @@ -44,8 +52,9 @@ public enum Action {
private final boolean push;
private final boolean pull;
private final boolean queue;
private final boolean requiresStreamName;

Action(String label, boolean pub, boolean pubSync, boolean regularCore, boolean push, boolean queue) {
Action(String label, boolean pub, boolean pubSync, boolean regularCore, boolean push, boolean queue, boolean requiresStreamName) {
this.label = label;
this.pubAction = pub;
this.pubSync = pubSync;
Expand All @@ -54,6 +63,7 @@ public enum Action {
this.push = push;
this.pull = !regularCore && !push;
this.queue = queue;
this.requiresStreamName = requiresStreamName;
}

public static Action getInstance(String text) {
Expand Down Expand Up @@ -101,6 +111,10 @@ public boolean isQueue() {
return queue;
}

public boolean requiresStreamName() {
return requiresStreamName;
}

@Override
public String toString() {
return label;
Expand Down
Loading

0 comments on commit 69f7042

Please sign in to comment.