Skip to content

Commit

Permalink
Improving Multi Tool reports
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 9, 2024
1 parent 4ac187d commit c5c0ec9
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 58 deletions.
12 changes: 11 additions & 1 deletion js-multi-tool/src/main/java/io/nats/jsmulti/JsMulti.java
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,18 @@ private static void _jsSyncConsume(Context ctx, Stats stats, String durable, Syn
private static void subSimple(Context ctx, Connection nc, Stats stats, int id) throws Exception {
// Really only need to lock when queueing b/c it's the same durable...
// ... to ensure protection from multiple threads trying to make the same consumer
String stream = ctx.stream;
if (stream == null) {
List<String> streamNames = nc.jetStreamManagement(ctx.getJetStreamOptions()).getStreamNames(ctx.subject);
if (streamNames.size() == 1) {
stream = streamNames.get(0);
}
else {
throw new TerminalException("Action requires stream name");
}
}
String durable = ctx.getSubDurable(id);
StreamContext streamContext = nc.getStreamContext(ctx.stream);
StreamContext streamContext = nc.getStreamContext(stream);
ConsumerContext cc;
synchronized (CREATE_CONSUMER_LOCK) {
cc = streamContext.createOrUpdateConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ public static void main(String[] args) throws Exception {
// You could code this to use args to create the Arguments
Arguments a = Arguments.instance()
.server(SERVER)
.stream(STREAM)
.subject(SUBJECT)
.action(Action.SUB_PULL_QUEUE) // could be Action.SUB_PULL_READ for example
.messageCount(50_000) // default is 100_000. Consumer needs this to know when to stop.
.action(Action.SUB_FETCH_QUEUE) // could be Action.SUB_PULL_READ for example
.messageCount(600_000) // default is 100_000. Consumer needs this to know when to stop.
// .ackPolicy(AckPolicy.None) // default is AckPolicy.Explicit which is the only policy allowed for PULL at the moment
// .ackAllFrequency(20) // for AckPolicy.All how many message to wait before acking, DEFAULT IS 1
.batchSize(20) // default is 10 only used with pull subs
// .batchSize(20) // default is 10 only used with pull subs
.threads(3) // default is 1
.individualConnection() // versus .sharedConnection()
// .reportFrequency(500) // default is 10% of message count
Expand All @@ -62,7 +63,7 @@ public static void main(String[] args) throws Exception {
// Uncomment for latency runs. The stream needs to exist
// before the consumers start.
// -----------------------------------------------------
StreamUtils.setupStream(STREAM, ctx);
// StreamUtils.setupStream(STREAM, ctx);

JsMulti.run(ctx, true, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ public static void main(String[] args) throws Exception {
.server(SERVER)
.subject(SUBJECT)
.action(Action.PUB_SYNC) // or Action.PUB_ASYNC or Action.PUB_CORE for example
.latencyFlag(LATENCY_RUN) // tells the code to add latency info to the header
.messageCount(50_000) // default is 100_000
.payloadSize(0) // default is 128
// .latencyFlag(LATENCY_RUN) // tells the code to add latency info to the header
.messageCount(1_000_000) // default is 100_000
.payloadSize(0) // default is 128
.roundSize(50) // how often to check Async Publish Acks, default is 100
.threads(3) // default is 1
.threads(1) // default is 1
.individualConnection() // versus .sharedConnection()
// .reportFrequency(500) // default is 10% of message count
;
Expand All @@ -64,9 +64,9 @@ public static void main(String[] args) throws Exception {
// stream because the stream needs to exist before the
// consumers start.
// ---------------------------------------------------
if (!LATENCY_RUN) {
// if (!LATENCY_RUN) {
StreamUtils.setupStream(STREAM, ctx);
}
// }

JsMulti.run(ctx);
}
Expand Down
54 changes: 24 additions & 30 deletions js-multi-tool/src/main/java/io/nats/jsmulti/settings/Action.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,35 @@
package io.nats.jsmulti.settings;

public enum Action {
PUB_SYNC( "PubSync", true, true, false, false, false, false),
PUB_ASYNC( "PubAsync", true, false, false, false, false, false),
PUB_CORE( "PubCore", true, true, false, false, false, false),
PUB( "Pub", true, true, true, false, false, false),
PUB_SYNC( "PubSync", true, true, false, false, false),
PUB_ASYNC( "PubAsync", true, false, false, false, false),
PUB_CORE( "PubCore", true, true, false, false, false),
PUB( "Pub", true, true, true, false, false),

REQUEST( "Request", true, true, true, false, false, false),
// REQUEST_ASYNC( "RequestAsync", true, false, true, false, false),
REPLY( "Reply", false, false, true, true, false, false),
REQUEST( "Request", true, true, true, false, false),
// REQUEST_ASYNC( "RequestAsync", true, false, true, false),
REPLY( "Reply", false, false, true, true, false),

SUB_CORE( "SubCore", false, false, true, true, false, false),
SUB_CORE_QUEUE( "SubCoreQueue", false, false, true, true, true, false),
SUB_CORE( "SubCore", false, false, true, true, false),
SUB_CORE_QUEUE( "SubCoreQueue", false, false, true, true, true),

SUB_PUSH( "SubPush", false, false, false, true, false, false),
SUB_QUEUE( "SubQueue", false, false, false, true, true, false),
SUB_PULL( "SubPull", false, false, false, false, false, false),
SUB_PULL_READ( "SubPullRead", false, false, false, false, false, false), // read is manual continuous sync pull (pre-simplification)
SUB_PULL_QUEUE( "SubPullQueue", false, false, false, false, true, false),
SUB_PULL_READ_QUEUE( "SubPullReadQueue", false, false, false, false, false, false), // read is manual continuous sync pull (pre-simplification)
SUB_PUSH( "SubPush", false, false, false, true, false),
SUB_QUEUE( "SubQueue", false, false, false, true, true),
SUB_PULL( "SubPull", false, false, false, false, false),
SUB_PULL_READ( "SubPullRead", false, false, false, false, false), // read is manual continuous sync pull (pre-simplification)
SUB_PULL_QUEUE( "SubPullQueue", false, false, false, false, true),
SUB_PULL_READ_QUEUE( "SubPullReadQueue", false, false, false, false, false), // read is manual continuous sync pull (pre-simplification)

// simplification
SUB_FETCH( "SubFetch", false, false, false, false, false, true),
SUB_ITERATE( "SubIterate", false, false, false, false, false, true),
// SUB_CONSUME( "SubConsume", false, false, false, false, false, true),
SUB_FETCH_QUEUE( "SubFetchQueue", false, false, false, false, true, true),
SUB_ITERATE_QUEUE( "SubIterateQueue", false, false, false, false, true, true),
// SUB_CONSUME_QUEUE( "SubConsumeQueue", false, false, false, false, true, true),
SUB_FETCH( "SubFetch", false, false, false, false, false),
SUB_ITERATE( "SubIterate", false, false, false, false, false),
// SUB_CONSUME( "SubConsume", false, false, false, false, false),
SUB_FETCH_QUEUE( "SubFetchQueue", false, false, false, false, true),
SUB_ITERATE_QUEUE( "SubIterateQueue", false, false, false, false, true),
// SUB_CONSUME_QUEUE( "SubConsumeQueue", false, false, false, false, true),

RTT( "RTT", true, true, true, false, false, false),
CUSTOM( "CUSTOM", false, false, false, false, false, false);
RTT( "RTT", true, true, true, false, false),
CUSTOM( "CUSTOM", false, false, false, false, false);

private final String label;
private final boolean pubAction;
Expand All @@ -52,9 +52,8 @@ public enum Action {
private final boolean push;
private final boolean pull;
private final boolean queue;
private final boolean requiresStreamName;

Action(String label, boolean pub, boolean pubSync, boolean regularCore, boolean push, boolean queue, boolean requiresStreamName) {
Action(String label, boolean pub, boolean pubSync, boolean regularCore, boolean push, boolean queue) {
this.label = label;
this.pubAction = pub;
this.pubSync = pubSync;
Expand All @@ -63,7 +62,6 @@ public enum Action {
this.push = push;
this.pull = !regularCore && !push;
this.queue = queue;
this.requiresStreamName = requiresStreamName;
}

public static Action getInstance(String text) {
Expand Down Expand Up @@ -111,10 +109,6 @@ public boolean isQueue() {
return queue;
}

public boolean requiresStreamName() {
return requiresStreamName;
}

@Override
public String toString() {
return label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,6 @@ public Context(String[] args) {
if (_action == null) {
error("Valid action required!");
}
else if (_action.requiresStreamName() && _stream == null) {
error("Action requires stream name");
}
else if (_messageCount < 1) {
error("Message count required!");
}
Expand Down Expand Up @@ -555,7 +552,7 @@ private Object classForName(String className, String label) {
}
}

private void error(String errMsg) {
public void error(String errMsg) {
if (app == null) {
System.err.println("ERROR: " + errMsg);
System.exit(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ private static boolean equivalent(List<String> l1, List<String> l2)
return true;
}

public static final String REPORT_SEP_LINE = "| -------------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ------- | ------- |";
public static final String REPORT_LINE_HEADER = "| %-14s | max | heap max | allocated | free | heap used | heap cmtd | non used | non cmtd | alive | dead |\n";
public static final String REPORT_LINE_FORMAT = "| %-14s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %7s | %7s |\n";
public static final String REPORT_SEP_LINE = "| ------------------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ---------- | ------- | ------- |";
public static final String REPORT_LINE_HEADER = "| %-19s | max | heap max | allocated | free | heap used | heap cmtd | non used | non cmtd | alive | dead |\n";
public static final String REPORT_LINE_FORMAT = "| %-19s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %10s | %7s | %7s |\n";

public static void report(List<ProfileStats> list) {
for (int x = 0; x < list.size(); x++) {
Expand Down
20 changes: 10 additions & 10 deletions js-multi-tool/src/main/java/io/nats/jsmulti/shared/Stats.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,21 @@ public class Stats {
public static final String[] HUMAN_BYTES_UNITS = new String[] {"b", "kb", "mb", "gb", "tb", "pb", "eb"};
public static final String ZEROS = "000000000";

public static final String REPORT_SEP_LINE = "| --------------- | ----------------- | --------------- | ------------------------ | ---------------- |";
public static final String REPORT_LINE_HEADER = "| %-15s | count | time | msgs/sec | bytes/sec |\n";
public static final String REPORT_LINE_FORMAT = "| %-15s | %12s msgs | %12s ms | %15s msgs/sec | %12s/sec |\n";
public static final String REPORT_SEP_LINE = "| ------------------- | ----------------- | --------------- | ------------------------ | ---------------- |";
public static final String REPORT_LINE_HEADER = "| %-19s | count | time | msgs/sec | bytes/sec |\n";
public static final String REPORT_LINE_FORMAT = "| %-19s | %12s msgs | %12s ms | %15s msgs/sec | %12s/sec |\n";

public static final String RTT_REPORT_SEP_LINE = "| --------------- | ------------ | ------------------ | ------------------ |";
public static final String RTT_REPORT_LINE_HEADER = "| %-15s | count | total time | average time |\n";
public static final String RTT_REPORT_LINE_FORMAT = "| %-15s | %12s | %12s ms | %15s ms |\n";
public static final String RTT_REPORT_SEP_LINE = "| ------------------- | ------------ | ------------------ | ------------------ |";
public static final String RTT_REPORT_LINE_HEADER = "| %-19s | count | total time | average time |\n";
public static final String RTT_REPORT_LINE_FORMAT = "| %-19s | %12s | %12s ms | %15s ms |\n";

public static final String LT_REPORT_SEP_LINE = "| --------------- | ------------------------ | ---------------- | ------------------------ | ---------------- | ------------------------ | ---------------- |";
public static final String LT_REPORT_SEP_LINE = "| ------------------- | ------------------------ | ---------------- | ------------------------ | ---------------- | ------------------------ | ---------------- |";
public static final String LT_REPORT_LINE_HEADER = "| Latency Total | Publish to Server Created | Server Created to Consumer Received | Publish to Consumer Received |";
public static final String LT_REPORT_LINE_FORMAT = "| %-15s | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec |\n";
public static final String LT_REPORT_LINE_FORMAT = "| %-19s | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec | %15s msgs/sec | %12s/sec |\n";

public static final String LM_REPORT_SEP_LINE = "| ----------------- | ------------------- | ------------------- | ------------------- |";
public static final String LM_REPORT_SEP_LINE = "| ------------------- | ------------------- | ------------------- | ------------------- |";
public static final String LM_REPORT_LINE_HEADER = "| Latency Message | Publish to Server | Server to Consumer | Publish to Consumer |";
public static final String LM_REPORT_LINE_FORMAT = "| %17s | %15s ms | %15s ms | %15s ms |\n";
public static final String LM_REPORT_LINE_FORMAT = "| %19s | %15s ms | %15s ms | %15s ms |\n";

public static final String LCSV_HEADER = "Publish Time,Server Time,Received Time,Publish to Server,Server to Consumer,Publish to Consumer\n";

Expand Down

0 comments on commit c5c0ec9

Please sign in to comment.