From c5c0ec9e0dc1a47f9902179b4bb554c7a8a59315 Mon Sep 17 00:00:00 2001 From: scottf Date: Mon, 9 Sep 2024 12:47:19 -0400 Subject: [PATCH] Improving Multi Tool reports --- .../main/java/io/nats/jsmulti/JsMulti.java | 12 ++++- .../io/nats/jsmulti/examples/Consumer.java | 9 ++-- .../io/nats/jsmulti/examples/Producer.java | 12 ++--- .../java/io/nats/jsmulti/settings/Action.java | 54 +++++++++---------- .../io/nats/jsmulti/settings/Context.java | 5 +- .../io/nats/jsmulti/shared/ProfileStats.java | 6 +-- .../java/io/nats/jsmulti/shared/Stats.java | 20 +++---- 7 files changed, 60 insertions(+), 58 deletions(-) diff --git a/js-multi-tool/src/main/java/io/nats/jsmulti/JsMulti.java b/js-multi-tool/src/main/java/io/nats/jsmulti/JsMulti.java index aaa5860..ae0e8ee 100644 --- a/js-multi-tool/src/main/java/io/nats/jsmulti/JsMulti.java +++ b/js-multi-tool/src/main/java/io/nats/jsmulti/JsMulti.java @@ -486,8 +486,18 @@ private static void _jsSyncConsume(Context ctx, Stats stats, String durable, Syn private static void subSimple(Context ctx, Connection nc, Stats stats, int id) throws Exception { // Really only need to lock when queueing b/c it's the same durable... // ... to ensure protection from multiple threads trying to make the same consumer + String stream = ctx.stream; + if (stream == null) { + List streamNames = nc.jetStreamManagement(ctx.getJetStreamOptions()).getStreamNames(ctx.subject); + if (streamNames.size() == 1) { + stream = streamNames.get(0); + } + else { + throw new TerminalException("Action requires stream name"); + } + } String durable = ctx.getSubDurable(id); - StreamContext streamContext = nc.getStreamContext(ctx.stream); + StreamContext streamContext = nc.getStreamContext(stream); ConsumerContext cc; synchronized (CREATE_CONSUMER_LOCK) { cc = streamContext.createOrUpdateConsumer( diff --git a/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Consumer.java b/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Consumer.java index 2dabe9c..24917eb 100644 --- a/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Consumer.java +++ b/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Consumer.java @@ -42,12 +42,13 @@ public static void main(String[] args) throws Exception { // You could code this to use args to create the Arguments Arguments a = Arguments.instance() .server(SERVER) + .stream(STREAM) .subject(SUBJECT) - .action(Action.SUB_PULL_QUEUE) // could be Action.SUB_PULL_READ for example - .messageCount(50_000) // default is 100_000. Consumer needs this to know when to stop. + .action(Action.SUB_FETCH_QUEUE) // could be Action.SUB_PULL_READ for example + .messageCount(600_000) // default is 100_000. Consumer needs this to know when to stop. // .ackPolicy(AckPolicy.None) // default is AckPolicy.Explicit which is the only policy allowed for PULL at the moment // .ackAllFrequency(20) // for AckPolicy.All how many message to wait before acking, DEFAULT IS 1 - .batchSize(20) // default is 10 only used with pull subs + // .batchSize(20) // default is 10 only used with pull subs .threads(3) // default is 1 .individualConnection() // versus .sharedConnection() // .reportFrequency(500) // default is 10% of message count @@ -62,7 +63,7 @@ public static void main(String[] args) throws Exception { // Uncomment for latency runs. The stream needs to exist // before the consumers start. // ----------------------------------------------------- - StreamUtils.setupStream(STREAM, ctx); +// StreamUtils.setupStream(STREAM, ctx); JsMulti.run(ctx, true, true); } diff --git a/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Producer.java b/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Producer.java index f8480c9..de5cd12 100644 --- a/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Producer.java +++ b/js-multi-tool/src/main/java/io/nats/jsmulti/examples/Producer.java @@ -46,11 +46,11 @@ public static void main(String[] args) throws Exception { .server(SERVER) .subject(SUBJECT) .action(Action.PUB_SYNC) // or Action.PUB_ASYNC or Action.PUB_CORE for example - .latencyFlag(LATENCY_RUN) // tells the code to add latency info to the header - .messageCount(50_000) // default is 100_000 - .payloadSize(0) // default is 128 +// .latencyFlag(LATENCY_RUN) // tells the code to add latency info to the header + .messageCount(1_000_000) // default is 100_000 + .payloadSize(0) // default is 128 .roundSize(50) // how often to check Async Publish Acks, default is 100 - .threads(3) // default is 1 + .threads(1) // default is 1 .individualConnection() // versus .sharedConnection() // .reportFrequency(500) // default is 10% of message count ; @@ -64,9 +64,9 @@ public static void main(String[] args) throws Exception { // stream because the stream needs to exist before the // consumers start. // --------------------------------------------------- - if (!LATENCY_RUN) { +// if (!LATENCY_RUN) { StreamUtils.setupStream(STREAM, ctx); - } +// } JsMulti.run(ctx); } diff --git a/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Action.java b/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Action.java index f7395bd..3b92615 100644 --- a/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Action.java +++ b/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Action.java @@ -14,35 +14,35 @@ package io.nats.jsmulti.settings; public enum Action { - PUB_SYNC( "PubSync", true, true, false, false, false, false), - PUB_ASYNC( "PubAsync", true, false, false, false, false, false), - PUB_CORE( "PubCore", true, true, false, false, false, false), - PUB( "Pub", true, true, true, false, false, false), + PUB_SYNC( "PubSync", true, true, false, false, false), + PUB_ASYNC( "PubAsync", true, false, false, false, false), + PUB_CORE( "PubCore", true, true, false, false, false), + PUB( "Pub", true, true, true, false, false), - REQUEST( "Request", true, true, true, false, false, false), - // REQUEST_ASYNC( "RequestAsync", true, false, true, false, false), - REPLY( "Reply", false, false, true, true, false, false), + REQUEST( "Request", true, true, true, false, false), + // REQUEST_ASYNC( "RequestAsync", true, false, true, false), + REPLY( "Reply", false, false, true, true, false), - SUB_CORE( "SubCore", false, false, true, true, false, false), - SUB_CORE_QUEUE( "SubCoreQueue", false, false, true, true, true, false), + SUB_CORE( "SubCore", false, false, true, true, false), + SUB_CORE_QUEUE( "SubCoreQueue", false, false, true, true, true), - SUB_PUSH( "SubPush", false, false, false, true, false, false), - SUB_QUEUE( "SubQueue", false, false, false, true, true, false), - SUB_PULL( "SubPull", false, false, false, false, false, false), - SUB_PULL_READ( "SubPullRead", false, false, false, false, false, false), // read is manual continuous sync pull (pre-simplification) - SUB_PULL_QUEUE( "SubPullQueue", false, false, false, false, true, false), - SUB_PULL_READ_QUEUE( "SubPullReadQueue", false, false, false, false, false, false), // read is manual continuous sync pull (pre-simplification) + SUB_PUSH( "SubPush", false, false, false, true, false), + SUB_QUEUE( "SubQueue", false, false, false, true, true), + SUB_PULL( "SubPull", false, false, false, false, false), + SUB_PULL_READ( "SubPullRead", false, false, false, false, false), // read is manual continuous sync pull (pre-simplification) + SUB_PULL_QUEUE( "SubPullQueue", false, false, false, false, true), + SUB_PULL_READ_QUEUE( "SubPullReadQueue", false, false, false, false, false), // read is manual continuous sync pull (pre-simplification) // simplification - SUB_FETCH( "SubFetch", false, false, false, false, false, true), - SUB_ITERATE( "SubIterate", false, false, false, false, false, true), - // SUB_CONSUME( "SubConsume", false, false, false, false, false, true), - SUB_FETCH_QUEUE( "SubFetchQueue", false, false, false, false, true, true), - SUB_ITERATE_QUEUE( "SubIterateQueue", false, false, false, false, true, true), - // SUB_CONSUME_QUEUE( "SubConsumeQueue", false, false, false, false, true, true), + SUB_FETCH( "SubFetch", false, false, false, false, false), + SUB_ITERATE( "SubIterate", false, false, false, false, false), + // SUB_CONSUME( "SubConsume", false, false, false, false, false), + SUB_FETCH_QUEUE( "SubFetchQueue", false, false, false, false, true), + SUB_ITERATE_QUEUE( "SubIterateQueue", false, false, false, false, true), + // SUB_CONSUME_QUEUE( "SubConsumeQueue", false, false, false, false, true), - RTT( "RTT", true, true, true, false, false, false), - CUSTOM( "CUSTOM", false, false, false, false, false, false); + RTT( "RTT", true, true, true, false, false), + CUSTOM( "CUSTOM", false, false, false, false, false); private final String label; private final boolean pubAction; @@ -52,9 +52,8 @@ public enum Action { private final boolean push; private final boolean pull; private final boolean queue; - private final boolean requiresStreamName; - Action(String label, boolean pub, boolean pubSync, boolean regularCore, boolean push, boolean queue, boolean requiresStreamName) { + Action(String label, boolean pub, boolean pubSync, boolean regularCore, boolean push, boolean queue) { this.label = label; this.pubAction = pub; this.pubSync = pubSync; @@ -63,7 +62,6 @@ public enum Action { this.push = push; this.pull = !regularCore && !push; this.queue = queue; - this.requiresStreamName = requiresStreamName; } public static Action getInstance(String text) { @@ -111,10 +109,6 @@ public boolean isQueue() { return queue; } - public boolean requiresStreamName() { - return requiresStreamName; - } - @Override public String toString() { return label; diff --git a/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Context.java b/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Context.java index d7bf1f6..054521b 100644 --- a/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Context.java +++ b/js-multi-tool/src/main/java/io/nats/jsmulti/settings/Context.java @@ -438,9 +438,6 @@ public Context(String[] args) { if (_action == null) { error("Valid action required!"); } - else if (_action.requiresStreamName() && _stream == null) { - error("Action requires stream name"); - } else if (_messageCount < 1) { error("Message count required!"); } @@ -555,7 +552,7 @@ private Object classForName(String className, String label) { } } - private void error(String errMsg) { + public void error(String errMsg) { if (app == null) { System.err.println("ERROR: " + errMsg); System.exit(-1); diff --git a/js-multi-tool/src/main/java/io/nats/jsmulti/shared/ProfileStats.java b/js-multi-tool/src/main/java/io/nats/jsmulti/shared/ProfileStats.java index 13b0746..818916d 100644 --- a/js-multi-tool/src/main/java/io/nats/jsmulti/shared/ProfileStats.java +++ b/js-multi-tool/src/main/java/io/nats/jsmulti/shared/ProfileStats.java @@ -222,9 +222,9 @@ private static boolean equivalent(List l1, List l2) return true; } - public static final String REPORT_SEP_LINE = "| -------------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ------- | ------- |"; - public static final String REPORT_LINE_HEADER = "| %-14s | max | heap max | allocated | free | heap used | heap cmtd | non used | non cmtd | alive | dead |\n"; - public static final String REPORT_LINE_FORMAT = "| %-14s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %7s | %7s |\n"; + public static final String REPORT_SEP_LINE = "| ------------------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ------- | ------- |"; + public static final String REPORT_LINE_HEADER = "| %-19s | max | heap max | allocated | free | heap used | heap cmtd | non used | non cmtd | alive | dead |\n"; + public static final String REPORT_LINE_FORMAT = "| %-19s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %7s | %7s |\n"; public static void report(List list) { for (int x = 0; x < list.size(); x++) { diff --git a/js-multi-tool/src/main/java/io/nats/jsmulti/shared/Stats.java b/js-multi-tool/src/main/java/io/nats/jsmulti/shared/Stats.java index 2ce4153..48fb9dd 100644 --- a/js-multi-tool/src/main/java/io/nats/jsmulti/shared/Stats.java +++ b/js-multi-tool/src/main/java/io/nats/jsmulti/shared/Stats.java @@ -32,21 +32,21 @@ public class Stats { public static final String[] HUMAN_BYTES_UNITS = new String[] {"b", "kb", "mb", "gb", "tb", "pb", "eb"}; public static final String ZEROS = "000000000"; - public static final String REPORT_SEP_LINE = "| --------------- | ----------------- | --------------- | ------------------------ | ---------------- |"; - public static final String REPORT_LINE_HEADER = "| %-15s | count | time | msgs/sec | bytes/sec |\n"; - public static final String REPORT_LINE_FORMAT = "| %-15s | %12s msgs | %12s ms | %15s msgs/sec | %12s/sec |\n"; + public static final String REPORT_SEP_LINE = "| ------------------- | ----------------- | --------------- | ------------------------ | ---------------- |"; + public static final String REPORT_LINE_HEADER = "| %-19s | count | time | msgs/sec | bytes/sec |\n"; + public static final String REPORT_LINE_FORMAT = "| %-19s | %12s msgs | %12s ms | %15s msgs/sec | %12s/sec |\n"; - public static final String RTT_REPORT_SEP_LINE = "| --------------- | ------------ | ------------------ | ------------------ |"; - public static final String RTT_REPORT_LINE_HEADER = "| %-15s | count | total time | average time |\n"; - public static final String RTT_REPORT_LINE_FORMAT = "| %-15s | %12s | %12s ms | %15s ms |\n"; + public static final String RTT_REPORT_SEP_LINE = "| ------------------- | ------------ | ------------------ | ------------------ |"; + public static final String RTT_REPORT_LINE_HEADER = "| %-19s | count | total time | average time |\n"; + public static final String RTT_REPORT_LINE_FORMAT = "| %-19s | %12s | %12s ms | %15s ms |\n"; - public static final String LT_REPORT_SEP_LINE = "| --------------- | ------------------------ | ---------------- | ------------------------ | ---------------- | ------------------------ | ---------------- |"; + public static final String LT_REPORT_SEP_LINE = "| ------------------- | ------------------------ | ---------------- | ------------------------ | ---------------- | ------------------------ | ---------------- |"; public static final String LT_REPORT_LINE_HEADER = "| Latency Total | Publish to Server Created | Server Created to Consumer Received | Publish to Consumer Received |"; - public static final String LT_REPORT_LINE_FORMAT = "| %-15s | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec |\n"; + public static final String LT_REPORT_LINE_FORMAT = "| %-19s | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec |\n"; - public static final String LM_REPORT_SEP_LINE = "| ----------------- | ------------------- | ------------------- | ------------------- |"; + public static final String LM_REPORT_SEP_LINE = "| ------------------- | ------------------- | ------------------- | ------------------- |"; public static final String LM_REPORT_LINE_HEADER = "| Latency Message | Publish to Server | Server to Consumer | Publish to Consumer |"; - public static final String LM_REPORT_LINE_FORMAT = "| %17s | %15s ms | %15s ms | %15s ms |\n"; + public static final String LM_REPORT_LINE_FORMAT = "| %19s | %15s ms | %15s ms | %15s ms |\n"; public static final String LCSV_HEADER = "Publish Time,Server Time,Received Time,Publish to Server,Server to Consumer,Publish to Consumer\n";