From 5f8159d0f392108f44a8937563385f704f406733 Mon Sep 17 00:00:00 2001 From: atakavci Date: Sun, 15 Dec 2024 04:24:21 +0300 Subject: [PATCH] fix ping issue with pubsub --- .../java/redis/clients/jedis/JedisPubSubBase.java | 13 +++++++++++-- .../redis/clients/jedis/JedisShardedPubSubBase.java | 6 +++++- .../TokenBasedAuthenticationUnitTests.java | 13 +++++++------ 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisPubSubBase.java b/src/main/java/redis/clients/jedis/JedisPubSubBase.java index 4f72f546d7..91fee36c58 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSubBase.java +++ b/src/main/java/redis/clients/jedis/JedisPubSubBase.java @@ -86,7 +86,13 @@ public final void ping() { } public final void ping(T argument) { - sendAndFlushCommand(Command.PING, argument); + authenticator.commandSync.lock(); + try { + sendAndFlushCommand(Command.PING, argument); + authenticator.resultHandler.add(pingResultHandler); + } finally { + authenticator.commandSync.unlock(); + } } public final boolean isSubscribed() { @@ -179,7 +185,10 @@ private void process() { throw new JedisException("Unknown message type: " + firstObj); } } else if (reply instanceof byte[]) { - Consumer resultHandler = authenticator.resultHandler.remove(); + Consumer resultHandler = authenticator.resultHandler.poll(); + if (resultHandler == null) { + throw new JedisException("Unexpected message : " + SafeEncoder.encode((byte[]) reply)); + } resultHandler.accept(reply); } else { throw new JedisException("Unknown message type: " + reply); diff --git a/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java b/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java index a52e9fbadf..9020693929 100644 --- a/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java +++ b/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java @@ -8,6 +8,7 @@ import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.util.SafeEncoder; public abstract class JedisShardedPubSubBase { @@ -101,7 +102,10 @@ private void process() { throw new JedisException("Unknown message type: " + firstObj); } } else if (reply instanceof byte[]) { - Consumer resultHandler = authenticator.resultHandler.remove(); + Consumer resultHandler = authenticator.resultHandler.poll(); + if (resultHandler == null) { + throw new JedisException("Unexpected message : " + SafeEncoder.encode((byte[]) reply)); + } resultHandler.accept(reply); } else { throw new JedisException("Unknown message type: " + reply); diff --git a/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationUnitTests.java b/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationUnitTests.java index eb345fffc6..ce5f3b9245 100644 --- a/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationUnitTests.java +++ b/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationUnitTests.java @@ -63,7 +63,7 @@ public void withExpirationRefreshRatio_testJedisAuthXManagerTriggersEvict() thro IdentityProvider idProvider = mock(IdentityProvider.class); when(idProvider.requestToken()) - .thenReturn(new SimpleToken("default","password", System.currentTimeMillis() + 1000, + .thenReturn(new SimpleToken("default", "password", System.currentTimeMillis() + 1000, System.currentTimeMillis(), Collections.singletonMap("oid", "default"))); TokenManager tokenManager = new TokenManager(idProvider, @@ -88,7 +88,7 @@ public void withLowerRefreshBounds_testJedisAuthXManagerTriggersEvict() throws E IdentityProvider idProvider = mock(IdentityProvider.class); when(idProvider.requestToken()) - .thenReturn(new SimpleToken("default","password", System.currentTimeMillis() + 1000, + .thenReturn(new SimpleToken("default", "password", System.currentTimeMillis() + 1000, System.currentTimeMillis(), Collections.singletonMap("oid", "default"))); TokenManager tokenManager = new TokenManager(idProvider, @@ -205,7 +205,7 @@ public void testCalculateRenewalDelay() { public void testAuthXManagerReceivesNewToken() throws InterruptedException, ExecutionException, TimeoutException { - IdentityProvider identityProvider = () -> new SimpleToken("user1","tokenVal", + IdentityProvider identityProvider = () -> new SimpleToken("user1", "tokenVal", System.currentTimeMillis() + 5 * 1000, System.currentTimeMillis(), Collections.singletonMap("oid", "user1")); @@ -277,7 +277,7 @@ public void testTokenManagerWithFailingTokenRequest() if (requesLatch.getCount() > 0) { throw new RuntimeException("Test exception from identity provider!"); } - return new SimpleToken("user1","tokenValX", System.currentTimeMillis() + 50 * 1000, + return new SimpleToken("user1", "tokenValX", System.currentTimeMillis() + 50 * 1000, System.currentTimeMillis(), Collections.singletonMap("oid", "user1")); }); @@ -289,9 +289,10 @@ public void testTokenManagerWithFailingTokenRequest() TokenListener listener = mock(TokenListener.class); tokenManager.start(listener, false); requesLatch.await(); + await().pollDelay(ONE_HUNDRED_MILLISECONDS).atMost(FIVE_HUNDRED_MILLISECONDS) + .untilAsserted(() -> verify(listener).onTokenRenewed(argument.capture())); verify(identityProvider, times(numberOfRetries)).requestToken(); verify(listener, never()).onError(any()); - verify(listener).onTokenRenewed(argument.capture()); assertEquals("tokenValX", argument.getValue().getValue()); } @@ -313,7 +314,7 @@ public void testTokenManagerWithHangingTokenRequest() } return null; } - return new SimpleToken("user1","tokenValX", System.currentTimeMillis() + tokenLifetime, + return new SimpleToken("user1", "tokenValX", System.currentTimeMillis() + tokenLifetime, System.currentTimeMillis(), Collections.singletonMap("oid", "user1")); };