Skip to content

Commit

Permalink
Ensure NatsConnection inboxDispatcher is started prior to publishing …
Browse files Browse the repository at this point in the history
…messages (#1109)

* Ensure NatsConnection inboxDispatcher is started prior to publishing messages Fixes #1065

If multiple treads call publish messages at the same time, some messages may not be published because the inboxDispater is not started yet.  Change updates NatsConnection to ensure the inboxDispatcher is started prior to publishing messages.

* Fix nats_server_path environment variable in README.md
  • Loading branch information
nathanschile authored Apr 4, 2024
1 parent ea126d6 commit d9eec8b
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 15 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ The java doc is located in `build/docs` and the example jar is in `build/libs`.
which will create a folder called `build/reports/jacoco` containing the file `index.html` you can open and use to browse the coverage. Keep in mind we have focused on library test coverage, not coverage for the examples.
Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable `nats_-_server_path`.
Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable `nats_server_path`.
## TLS Certs
Expand Down
27 changes: 16 additions & 11 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class NatsConnection implements Connection {

private final String mainInbox;
private final AtomicReference<NatsDispatcher> inboxDispatcher;
private final ReentrantLock inboxDispatcherLock;
private Timer timer;

private final AtomicBoolean needPing;
Expand Down Expand Up @@ -147,6 +148,7 @@ class NatsConnection implements Connection {

this.serverInfo = new AtomicReference<>();
this.inboxDispatcher = new AtomicReference<>();
this.inboxDispatcherLock = new ReentrantLock();
this.pongQueue = new ConcurrentLinkedDeque<>();
this.draining = new AtomicReference<>();
this.blockPublishForDrain = new AtomicBoolean();
Expand Down Expand Up @@ -1186,17 +1188,20 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
}

if (inboxDispatcher.get() == null) {
NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);

// Theoretically two threads could be here
// compareAndSet returns false if thread 2 set the dispatcher
// in between the time thread 1 did get above and tried to compareAndSet
// really thin edge condition - could have used a lock, but this is probably enough
if (inboxDispatcher.compareAndSet(null, d)) {
String id = this.nuid.next();
this.dispatchers.put(id, d);
d.start(id);
d.subscribe(this.mainInbox);
inboxDispatcherLock.lock();
try {
if (inboxDispatcher.get() == null) {
NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);

// Ensure the dispatcher is started before publishing messages
String id = this.nuid.next();
this.dispatchers.put(id, d);
d.start(id);
d.subscribe(this.mainInbox);
inboxDispatcher.set(d);
}
} finally {
inboxDispatcherLock.unlock();
}
}

Expand Down
36 changes: 33 additions & 3 deletions src/test/java/io/nats/client/impl/JetStreamPubTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import static org.junit.jupiter.api.Assertions.*;

Expand Down Expand Up @@ -184,6 +182,38 @@ public void testPublishAsyncVarieties() throws Exception {
});
}

@Test
public void testMultithreadedPublishAsync() throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
jsServer.run(nc -> {
TestingStreamContainer tsc = new TestingStreamContainer(nc);
final int messagesToPublish = 6;
// create a new connection that does not have the inbox dispatcher set
try (NatsConnection nc2 = new NatsConnection(nc.getOptions())){
nc2.connect(true);
JetStream js = nc2.jetStream();

List<Future<CompletableFuture<PublishAck>>> futures = new ArrayList<>();
for (int i = 0; i < messagesToPublish; i++) {
final Future<CompletableFuture<PublishAck>> submitFuture = executorService.submit(() ->
js.publishAsync(tsc.subject(), dataBytes(1)));
futures.add(submitFuture);
}
// verify all messages were published
for (int i = 0; i < messagesToPublish; i++) {
CompletableFuture<PublishAck> future = futures.get(i).get(200, TimeUnit.MILLISECONDS);
PublishAck pa = future.get(200, TimeUnit.MILLISECONDS);
assertEquals(tsc.stream, pa.getStream());
assertFalse(pa.isDuplicate());
}
}
});
} finally {
executorService.shutdownNow();
}
}

private void assertFutureIOException(CompletableFuture<PublishAck> future) {
ExecutionException ee = assertThrows(ExecutionException.class, future::get);
assertTrue(ee.getCause() instanceof RuntimeException);
Expand Down

0 comments on commit d9eec8b

Please sign in to comment.