diff --git a/src/main/java/io/nats/client/Connection.java b/src/main/java/io/nats/client/Connection.java index 4e6930e56..bf856daf6 100644 --- a/src/main/java/io/nats/client/Connection.java +++ b/src/main/java/io/nats/client/Connection.java @@ -404,6 +404,24 @@ enum Status { */ void closeDispatcher(Dispatcher dispatcher); + /** + * Attach another ConnectionListener. + * + *

The ConnectionListener will only receive Connection events arriving after it has been attached. When + * a Connection event is raised, the invocation order and parallelism of multiple ConnectionListeners is not + * specified. + * + * @param connectionListener the ConnectionListener to attach + */ + void addConnectionListener(ConnectionListener connectionListener); + + /** + * Detach a ConnectionListioner. This will cease delivery of any further Connection events to this instance. + * + * @param connectionListener the ConnectionListener to detach + */ + void removeConnectionListener(ConnectionListener connectionListener); + /** * Flush the connection's buffer of outgoing messages, including sending a * protocol message to and from the server. Passing null is equivalent to diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index bc39646b8..613c24903 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -75,6 +75,7 @@ class NatsConnection implements Connection { private final Map subscribers; private final Map dispatchers; // use a concurrent map so we get more consistent iteration // behavior + private final Collection connectionListeners; private final Map responsesAwaiting; private final Map responsesRespondedTo; private final ConcurrentLinkedDeque> pongQueue; @@ -116,6 +117,11 @@ class NatsConnection implements Connection { this.reconnectWaiter = new CompletableFuture<>(); this.reconnectWaiter.complete(Boolean.TRUE); + this.connectionListeners = ConcurrentHashMap.newKeySet(); + if (options.getConnectionListener() != null) { + addConnectionListener(options.getConnectionListener()); + } + this.dispatchers = new ConcurrentHashMap<>(); this.subscribers = new ConcurrentHashMap<>(); this.responsesAwaiting = new ConcurrentHashMap<>(); @@ -1272,6 +1278,14 @@ Map getDispatchers() { return Collections.unmodifiableMap(dispatchers); } + public void addConnectionListener(ConnectionListener connectionListener) { + connectionListeners.add(connectionListener); + } + + public void removeConnectionListener(ConnectionListener connectionListener) { + connectionListeners.remove(connectionListener); + } + public void flush(Duration timeout) throws TimeoutException, InterruptedException { Instant start = Instant.now(); @@ -1642,16 +1656,17 @@ void executeCallback(ErrorListenerCaller elc) { } void processConnectionEvent(Events type) { - ConnectionListener listener = this.options.getConnectionListener(); - if (listener != null && !this.callbackRunner.isShutdown()) { + if (!this.callbackRunner.isShutdown()) { try { - this.callbackRunner.execute(() -> { - try { - listener.connectionEvent(this, type); - } catch (Exception ex) { - this.statistics.incrementExceptionCount(); - } - }); + for (ConnectionListener listener : connectionListeners) { + this.callbackRunner.execute(() -> { + try { + listener.connectionEvent(this, type); + } catch (Exception ex) { + this.statistics.incrementExceptionCount(); + } + }); + } } catch (RejectedExecutionException re) { // Timing with shutdown, let it go } diff --git a/src/test/java/io/nats/client/impl/ConnectionListenerTests.java b/src/test/java/io/nats/client/impl/ConnectionListenerTests.java index bc3aa6231..26c76cc40 100644 --- a/src/test/java/io/nats/client/impl/ConnectionListenerTests.java +++ b/src/test/java/io/nats/client/impl/ConnectionListenerTests.java @@ -18,6 +18,10 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import static io.nats.client.utils.TestBase.*; @@ -111,4 +115,44 @@ public void testExceptionInConnectionListener() throws Exception { assertTrue(((NatsConnection)nc).getNatsStatistics().getExceptions() > 0); } } + + @Test + public void testMultipleConnectionListeners() throws Exception { + Set capturedEvents = ConcurrentHashMap.newKeySet(); + + try (NatsTestServer ts = new NatsTestServer(false)) { + TestHandler handler = new TestHandler(); + Options options = new Options.Builder(). + server(ts.getURI()). + connectionListener(handler). + build(); + Connection nc = standardConnection(options); + assertEquals(ts.getURI(), nc.getConnectedUrl()); + + assertThrows(NullPointerException.class, () -> nc.addConnectionListener(null)); + assertThrows(NullPointerException.class, () -> nc.removeConnectionListener(null)); + + ConnectionListener removedConnectionListener = (conn, event) -> capturedEvents.add("NEVER INVOKED"); + nc.addConnectionListener(removedConnectionListener); + nc.addConnectionListener((conn, event) -> capturedEvents.add("CL1-" + event.name())); + nc.addConnectionListener((conn, event) -> capturedEvents.add("CL2-" + event.name())); + nc.addConnectionListener((conn, event) -> { throw new RuntimeException("should not interfere with other listeners"); }); + nc.addConnectionListener((conn, event) -> capturedEvents.add("CL3-" + event.name())); + nc.addConnectionListener((conn, event) -> capturedEvents.add("CL4-" + event.name())); + nc.removeConnectionListener(removedConnectionListener); + + standardCloseConnection(nc); + assertNull(nc.getConnectedUrl()); + assertEquals(1, handler.getEventCount(Events.CLOSED)); + assertTrue(((NatsConnection)nc).getNatsStatistics().getExceptions() > 0); + } + + Set expectedEvents = new HashSet<>(Arrays.asList( + "CL1-CLOSED", + "CL2-CLOSED", + "CL3-CLOSED", + "CL4-CLOSED")); + + assertEquals(expectedEvents, capturedEvents); + } } \ No newline at end of file