diff --git a/README.md b/README.md index c6c85155a..13eabcb56 100644 --- a/README.md +++ b/README.md @@ -1221,7 +1221,7 @@ The java doc is located in `build/docs` and the example jar is in `build/libs`. which will create a folder called `build/reports/jacoco` containing the file `index.html` you can open and use to browse the coverage. Keep in mind we have focused on library test coverage, not coverage for the examples. -Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable `nats_-_server_path`. +Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable `nats_server_path`. ## TLS Certs diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 9aff4bf22..ebf62f16c 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -81,6 +81,7 @@ class NatsConnection implements Connection { private final String mainInbox; private final AtomicReference inboxDispatcher; + private final ReentrantLock inboxDispatcherLock; private Timer timer; private final AtomicBoolean needPing; @@ -147,6 +148,7 @@ class NatsConnection implements Connection { this.serverInfo = new AtomicReference<>(); this.inboxDispatcher = new AtomicReference<>(); + this.inboxDispatcherLock = new ReentrantLock(); this.pongQueue = new ConcurrentLinkedDeque<>(); this.draining = new AtomicReference<>(); this.blockPublishForDrain = new AtomicBoolean(); @@ -1186,17 +1188,20 @@ CompletableFuture requestFutureInternal(String subject, Headers headers } if (inboxDispatcher.get() == null) { - NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply); - - // Theoretically two threads could be here - // compareAndSet returns false if thread 2 set the dispatcher - // in between the time thread 1 did get above and tried to compareAndSet - // really thin edge condition - could have used a lock, but this is probably enough - if (inboxDispatcher.compareAndSet(null, d)) { - String id = this.nuid.next(); - this.dispatchers.put(id, d); - d.start(id); - d.subscribe(this.mainInbox); + inboxDispatcherLock.lock(); + try { + if (inboxDispatcher.get() == null) { + NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply); + + // Ensure the dispatcher is started before publishing messages + String id = this.nuid.next(); + this.dispatchers.put(id, d); + d.start(id); + d.subscribe(this.mainInbox); + inboxDispatcher.set(d); + } + } finally { + inboxDispatcherLock.unlock(); } } diff --git a/src/test/java/io/nats/client/impl/JetStreamPubTests.java b/src/test/java/io/nats/client/impl/JetStreamPubTests.java index 7c9802a37..2e8ab7d6a 100644 --- a/src/test/java/io/nats/client/impl/JetStreamPubTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamPubTests.java @@ -23,9 +23,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import static org.junit.jupiter.api.Assertions.*; @@ -184,6 +182,38 @@ public void testPublishAsyncVarieties() throws Exception { }); } + @Test + public void testMultithreadedPublishAsync() throws Exception { + final ExecutorService executorService = Executors.newFixedThreadPool(3); + try { + jsServer.run(nc -> { + TestingStreamContainer tsc = new TestingStreamContainer(nc); + final int messagesToPublish = 6; + // create a new connection that does not have the inbox dispatcher set + try (NatsConnection nc2 = new NatsConnection(nc.getOptions())){ + nc2.connect(true); + JetStream js = nc2.jetStream(); + + List>> futures = new ArrayList<>(); + for (int i = 0; i < messagesToPublish; i++) { + final Future> submitFuture = executorService.submit(() -> + js.publishAsync(tsc.subject(), dataBytes(1))); + futures.add(submitFuture); + } + // verify all messages were published + for (int i = 0; i < messagesToPublish; i++) { + CompletableFuture future = futures.get(i).get(200, TimeUnit.MILLISECONDS); + PublishAck pa = future.get(200, TimeUnit.MILLISECONDS); + assertEquals(tsc.stream, pa.getStream()); + assertFalse(pa.isDuplicate()); + } + } + }); + } finally { + executorService.shutdownNow(); + } + } + private void assertFutureIOException(CompletableFuture future) { ExecutionException ee = assertThrows(ExecutionException.class, future::get); assertTrue(ee.getCause() instanceof RuntimeException);