diff --git a/src/main/java/redis/clients/jedis/Connection.java b/src/main/java/redis/clients/jedis/Connection.java index c14dfa08f6..de473d0b8e 100644 --- a/src/main/java/redis/clients/jedis/Connection.java +++ b/src/main/java/redis/clients/jedis/Connection.java @@ -46,9 +46,8 @@ public class Connection implements Closeable { private String strVal; protected String server; protected String version; - protected AtomicReference currentCredentials = new AtomicReference( - null); - private boolean isTokenBasedAuthenticationEnabled = false; + private AtomicReference currentCredentials = new AtomicReference<>(null); + private AuthXManager authXManager; public Connection() { this(Protocol.DEFAULT_HOST, Protocol.DEFAULT_PORT); @@ -68,6 +67,7 @@ public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientC public Connection(final JedisSocketFactory socketFactory) { this.socketFactory = socketFactory; + this.authXManager = null; } public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig) { @@ -458,9 +458,8 @@ protected void initializeFromClientConfig(final JedisClientConfig config) { Supplier credentialsProvider = config.getCredentialsProvider(); - AuthXManager authXManager = config.getAuthXManager(); + authXManager = config.getAuthXManager(); if (authXManager != null) { - isTokenBasedAuthenticationEnabled = true; credentialsProvider = authXManager; } @@ -608,7 +607,11 @@ public boolean ping() { return true; } - public boolean isTokenBasedAuthenticationEnabled() { - return isTokenBasedAuthenticationEnabled; + protected boolean isTokenBasedAuthenticationEnabled() { + return authXManager != null; + } + + protected AuthXManager getAuthXManager() { + return authXManager; } } diff --git a/src/main/java/redis/clients/jedis/JedisPubSubBase.java b/src/main/java/redis/clients/jedis/JedisPubSubBase.java index 5c96278fb9..4f72f546d7 100644 --- a/src/main/java/redis/clients/jedis/JedisPubSubBase.java +++ b/src/main/java/redis/clients/jedis/JedisPubSubBase.java @@ -4,6 +4,7 @@ import java.util.Arrays; import java.util.List; +import java.util.function.Consumer; import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.exceptions.JedisException; @@ -12,7 +13,8 @@ public abstract class JedisPubSubBase { private int subscribedChannels = 0; - private volatile Connection client; + private final JedisSafeAuthenticator authenticator = new JedisSafeAuthenticator(); + private final Consumer pingResultHandler = this::processPingReply; public void onMessage(T channel, T message) { } @@ -36,12 +38,7 @@ public void onPong(T pattern) { } private void sendAndFlushCommand(Command command, T... args) { - if (client == null) { - throw new JedisException(getClass() + " is not connected to a Connection."); - } - CommandArguments cargs = new CommandArguments(command).addObjects(args); - client.sendCommand(cargs); - client.flush(); + authenticator.sendAndFlushCommand(command, args); } public final void unsubscribe() { @@ -63,7 +60,8 @@ public final void psubscribe(T... patterns) { } private void checkConnectionSuitableForPubSub() { - if (client.protocol == RedisProtocol.RESP2 && client.isTokenBasedAuthenticationEnabled()) { + if (authenticator.client.protocol != RedisProtocol.RESP3 + && authenticator.client.isTokenBasedAuthenticationEnabled()) { throw new JedisException( "Blocking pub/sub operations are not supported on token-based authentication enabled connections with RESP2 protocol!"); } @@ -78,7 +76,13 @@ public final void punsubscribe(T... patterns) { } public final void ping() { - sendAndFlushCommand(Command.PING); + authenticator.commandSync.lock(); + try { + sendAndFlushCommand(Command.PING); + authenticator.resultHandler.add(pingResultHandler); + } finally { + authenticator.commandSync.unlock(); + } } public final void ping(T argument) { @@ -94,24 +98,24 @@ public final int getSubscribedChannels() { } public final void proceed(Connection client, T... channels) { - this.client = client; - this.client.setTimeoutInfinite(); + authenticator.registerForAuthentication(client); + authenticator.client.setTimeoutInfinite(); try { subscribe(channels); process(); } finally { - this.client.rollbackTimeout(); + authenticator.client.rollbackTimeout(); } } public final void proceedWithPatterns(Connection client, T... patterns) { - this.client = client; - this.client.setTimeoutInfinite(); + authenticator.registerForAuthentication(client); + authenticator.client.setTimeoutInfinite(); try { psubscribe(patterns); process(); } finally { - this.client.rollbackTimeout(); + authenticator.client.rollbackTimeout(); } } @@ -121,7 +125,7 @@ public final void proceedWithPatterns(Connection client, T... patterns) { private void process() { do { - Object reply = client.getUnflushedObject(); + Object reply = authenticator.client.getUnflushedObject(); if (reply instanceof List) { List listReply = (List) reply; @@ -175,12 +179,8 @@ private void process() { throw new JedisException("Unknown message type: " + firstObj); } } else if (reply instanceof byte[]) { - byte[] resp = (byte[]) reply; - if ("PONG".equals(SafeEncoder.encode(resp))) { - onPong(null); - } else { - onPong(encode(resp)); - } + Consumer resultHandler = authenticator.resultHandler.remove(); + resultHandler.accept(reply); } else { throw new JedisException("Unknown message type: " + reply); } @@ -189,4 +189,13 @@ private void process() { // /* Invalidate instance since this thread is no longer listening */ // this.client = null; } + + private void processPingReply(Object reply) { + byte[] resp = (byte[]) reply; + if ("PONG".equals(SafeEncoder.encode(resp))) { + onPong(null); + } else { + onPong(encode(resp)); + } + } } diff --git a/src/main/java/redis/clients/jedis/JedisSafeAuthenticator.java b/src/main/java/redis/clients/jedis/JedisSafeAuthenticator.java new file mode 100644 index 0000000000..16b72f1684 --- /dev/null +++ b/src/main/java/redis/clients/jedis/JedisSafeAuthenticator.java @@ -0,0 +1,104 @@ +package redis.clients.jedis; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import redis.clients.authentication.core.SimpleToken; +import redis.clients.authentication.core.Token; +import redis.clients.jedis.Protocol.Command; +import redis.clients.jedis.authentication.JedisAuthenticationException; +import redis.clients.jedis.exceptions.JedisException; +import redis.clients.jedis.util.SafeEncoder; + +public class JedisSafeAuthenticator { + + private static final Token PLACEHOLDER_TOKEN = new SimpleToken(null, null, 0, 0, null); + private static final Logger logger = LoggerFactory.getLogger(JedisSafeAuthenticator.class); + + protected volatile Connection client; + protected final Consumer authResultHandler = this::processAuthReply; + protected final Consumer authenticationHandler = this::safeReAuthenticate; + + protected final AtomicReference pendingTokenRef = new AtomicReference(null); + protected final ReentrantLock commandSync = new ReentrantLock(); + protected final Queue> resultHandler = new ConcurrentLinkedQueue>(); + + protected void sendAndFlushCommand(Command command, Object... args) { + if (client == null) { + throw new JedisException(getClass() + " is not connected to a Connection."); + } + CommandArguments cargs = new CommandArguments(command).addObjects(args); + + Token newToken = pendingTokenRef.getAndSet(PLACEHOLDER_TOKEN); + + // lets send the command without locking !!IF!! we know that pendingTokenRef is null replaced with PLACEHOLDER_TOKEN and no re-auth will go into action + // !!ELSE!! we are locking since we already know a re-auth is still in progress in another thread and we need to wait for it to complete, we do nothing but wait on it! + if (newToken != null) { + commandSync.lock(); + } + try { + client.sendCommand(cargs); + client.flush(); + } finally { + Token newerToken = pendingTokenRef.getAndSet(null); + // lets check if a newer token received since the beginning of this sendAndFlushCommand call + if (newerToken != null && newerToken != PLACEHOLDER_TOKEN) { + safeReAuthenticate(newerToken); + } + if (newToken != null) { + commandSync.unlock(); + } + } + } + + protected void registerForAuthentication(Connection newClient) { + Connection oldClient = this.client; + if (oldClient == newClient) return; + if (oldClient != null && oldClient.getAuthXManager() != null) { + oldClient.getAuthXManager().removePostAuthenticationHook(authenticationHandler); + } + if (newClient != null && newClient.getAuthXManager() != null) { + newClient.getAuthXManager().addPostAuthenticationHook(authenticationHandler); + } + this.client = newClient; + } + + private void safeReAuthenticate(Token token) { + try { + byte[] rawPass = client.encodeToBytes(token.getValue().toCharArray()); + byte[] rawUser = client.encodeToBytes(token.getUser().toCharArray()); + + Token newToken = pendingTokenRef.getAndSet(token); + if (newToken == null) { + commandSync.lock(); + try { + sendAndFlushCommand(Command.AUTH, rawUser, rawPass); + resultHandler.add(this.authResultHandler); + } finally { + pendingTokenRef.set(null); + commandSync.unlock(); + } + } + } catch (Exception e) { + logger.error("Error while re-authenticating connection", e); + client.getAuthXManager().getListener().onConnectionAuthenticationError(e); + } + } + + protected void processAuthReply(Object reply) { + byte[] resp = (byte[]) reply; + String response = SafeEncoder.encode(resp); + if (!"OK".equals(response)) { + String msg = "Re-authentication failed with server response: " + response; + Exception failedAuth = new JedisAuthenticationException(msg); + logger.error(failedAuth.getMessage(), failedAuth); + client.getAuthXManager().getListener().onConnectionAuthenticationError(failedAuth); + } + } +} diff --git a/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java b/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java index 2b2ce944fe..a52e9fbadf 100644 --- a/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java +++ b/src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java @@ -4,6 +4,7 @@ import java.util.Arrays; import java.util.List; +import java.util.function.Consumer; import redis.clients.jedis.Protocol.Command; import redis.clients.jedis.exceptions.JedisException; @@ -11,7 +12,7 @@ public abstract class JedisShardedPubSubBase { private int subscribedChannels = 0; - private volatile Connection client; + private final JedisSafeAuthenticator authenticator = new JedisSafeAuthenticator(); public void onSMessage(T channel, T message) { } @@ -23,12 +24,7 @@ public void onSUnsubscribe(T channel, int subscribedChannels) { } private void sendAndFlushCommand(Command command, T... args) { - if (client == null) { - throw new JedisException(getClass() + " is not connected to a Connection."); - } - CommandArguments cargs = new CommandArguments(command).addObjects(args); - client.sendCommand(cargs); - client.flush(); + authenticator.sendAndFlushCommand(command, args); } public final void sunsubscribe() { @@ -40,9 +36,18 @@ public final void sunsubscribe(T... channels) { } public final void ssubscribe(T... channels) { + checkConnectionSuitableForPubSub(); sendAndFlushCommand(Command.SSUBSCRIBE, channels); } + private void checkConnectionSuitableForPubSub() { + if (authenticator.client.protocol != RedisProtocol.RESP3 + && authenticator.client.isTokenBasedAuthenticationEnabled()) { + throw new JedisException( + "Blocking pub/sub operations are not supported on token-based authentication enabled connections with RESP2 protocol!"); + } + } + public final boolean isSubscribed() { return subscribedChannels > 0; } @@ -52,23 +57,22 @@ public final int getSubscribedChannels() { } public final void proceed(Connection client, T... channels) { - this.client = client; - this.client.setTimeoutInfinite(); + authenticator.registerForAuthentication(client); + authenticator.client.setTimeoutInfinite(); try { ssubscribe(channels); process(); } finally { - this.client.rollbackTimeout(); + authenticator.client.rollbackTimeout(); } } protected abstract T encode(byte[] raw); -// private void process(Client client) { private void process() { do { - Object reply = client.getUnflushedObject(); + Object reply = authenticator.client.getUnflushedObject(); if (reply instanceof List) { List listReply = (List) reply; @@ -96,6 +100,9 @@ private void process() { } else { throw new JedisException("Unknown message type: " + firstObj); } + } else if (reply instanceof byte[]) { + Consumer resultHandler = authenticator.resultHandler.remove(); + resultHandler.accept(reply); } else { throw new JedisException("Unknown message type: " + reply); } diff --git a/src/test/java/redis/clients/jedis/authentication/RedisEntraIDIntegrationTests.java b/src/test/java/redis/clients/jedis/authentication/RedisEntraIDIntegrationTests.java index fa920e1daa..55551331ed 100644 --- a/src/test/java/redis/clients/jedis/authentication/RedisEntraIDIntegrationTests.java +++ b/src/test/java/redis/clients/jedis/authentication/RedisEntraIDIntegrationTests.java @@ -270,13 +270,6 @@ public void allConnectionsReauthTest() throws InterruptedException, ExecutionExc } } - // T.3.2 - // Test system behavior when some connections fail to re-authenticate during bulk authentication. e.g when a network partition occurs for 1 or more of them - @Test - public void partialReauthFailureTest() { - - } - // T.3.3 // Verify behavior when attempting to authenticate a single connection with an expired token. @Test diff --git a/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationIntegrationTests.java b/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationIntegrationTests.java index b9fcfa8218..336fa416dd 100644 --- a/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationIntegrationTests.java +++ b/src/test/java/redis/clients/jedis/authentication/TokenBasedAuthenticationIntegrationTests.java @@ -2,21 +2,27 @@ import static org.mockito.Mockito.when; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Durations.ONE_SECOND; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import java.util.Arrays; +import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mockito.MockedStatic; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,16 +31,17 @@ import redis.clients.authentication.core.SimpleToken; import redis.clients.authentication.core.TokenAuthConfig; import redis.clients.jedis.CommandArguments; +import redis.clients.jedis.Connection; /* */ import redis.clients.jedis.DefaultJedisClientConfig; import redis.clients.jedis.EndpointConfig; import redis.clients.jedis.HostAndPorts; import redis.clients.jedis.JedisClientConfig; import redis.clients.jedis.JedisPooled; -import redis.clients.jedis.Protocol; +import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.RedisProtocol; import redis.clients.jedis.Protocol.Command; -import redis.clients.jedis.args.Rawable; -import redis.clients.jedis.commands.ProtocolCommand; +import redis.clients.jedis.exceptions.JedisException; public class TokenBasedAuthenticationIntegrationTests { private static final Logger log = LoggerFactory @@ -62,9 +69,8 @@ public void testJedisPooledForInitialAuth() { String password = endpointConfig.getPassword(); IdentityProvider idProvider = mock(IdentityProvider.class); - when(idProvider.requestToken()) - .thenReturn(new SimpleToken(user, password, System.currentTimeMillis() + 100000, - System.currentTimeMillis(), null)); + when(idProvider.requestToken()).thenReturn(new SimpleToken(user, password, + System.currentTimeMillis() + 100000, System.currentTimeMillis(), null)); IdentityProviderConfig idProviderConfig = mock(IdentityProviderConfig.class); when(idProviderConfig.getProvider()).thenReturn(idProvider); @@ -76,26 +82,175 @@ public void testJedisPooledForInitialAuth() { JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() .authXManager(new AuthXManager(tokenAuthConfig)).build(); - try (MockedStatic mockedStatic = Mockito.mockStatic(Protocol.class)) { - ArgumentCaptor captor = ArgumentCaptor.forClass(CommandArguments.class); + try (JedisPooled jedis = new JedisPooled(endpointConfig.getHostAndPort(), clientConfig)) { + jedis.get("key1"); + } + } + + @Test + public void testJedisPooledReauth() { + String user = "default"; + String password = endpointConfig.getPassword(); + + IdentityProvider idProvider = mock(IdentityProvider.class); + when(idProvider.requestToken()).thenAnswer(invocation -> new SimpleToken(user, password, + System.currentTimeMillis() + 5000, System.currentTimeMillis(), null)); - try (JedisPooled jedis = new JedisPooled(endpointConfig.getHostAndPort(), clientConfig)) { - jedis.get("key1"); + IdentityProviderConfig idProviderConfig = mock(IdentityProviderConfig.class); + when(idProviderConfig.getProvider()).thenReturn(idProvider); + + TokenAuthConfig tokenAuthConfig = TokenAuthConfig.builder() + .identityProviderConfig(idProviderConfig).expirationRefreshRatio(0.8F) + .lowerRefreshBoundMillis(4800).tokenRequestExecTimeoutInMs(1000).build(); + + AuthXManager authXManager = new AuthXManager(tokenAuthConfig); + authXManager = spy(authXManager); + List connections = new ArrayList<>(); + doAnswer(invocation -> { + Connection connection = spy((Connection) invocation.getArgument(0)); + invocation.getArguments()[0] = connection; + connections.add(connection); + Object result = invocation.callRealMethod(); + return result; + }).when(authXManager).addConnection(any(Connection.class)); + + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().authXManager(authXManager) + .build(); + + try (JedisPooled jedis = new JedisPooled(endpointConfig.getHostAndPort(), clientConfig)) { + AtomicBoolean stop = new AtomicBoolean(false); + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> { + while (!stop.get()) { + jedis.get("key1"); + } + }); + + for (Connection connection : connections) { + await().pollDelay(ONE_HUNDRED_MILLISECONDS).atMost(ONE_SECOND).untilAsserted(() -> { + verify(connection, atLeast(3)).reAuthenticate(); + }); } + stop.set(true); + executor.shutdown(); + } + } + + @Test + public void testPubSubForInitialAuth() throws InterruptedException { + String user = "default"; + String password = endpointConfig.getPassword(); - // Verify that the static method was called - mockedStatic.verify(() -> Protocol.sendCommand(any(), captor.capture()), Mockito.atLeast(4)); + IdentityProvider idProvider = mock(IdentityProvider.class); + when(idProvider.requestToken()).thenReturn(new SimpleToken(user, password, + System.currentTimeMillis() + 100000, System.currentTimeMillis(), null)); - CommandArguments commandArgs = captor.getAllValues().get(0); - List args = StreamSupport.stream(commandArgs.spliterator(), false) - .map(Rawable::getRaw).collect(Collectors.toList()); + IdentityProviderConfig idProviderConfig = mock(IdentityProviderConfig.class); + when(idProviderConfig.getProvider()).thenReturn(idProvider); + + TokenAuthConfig tokenAuthConfig = TokenAuthConfig.builder() + .identityProviderConfig(idProviderConfig).expirationRefreshRatio(0.8F) + .lowerRefreshBoundMillis(10000).tokenRequestExecTimeoutInMs(1000).build(); + + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() + .authXManager(new AuthXManager(tokenAuthConfig)).protocol(RedisProtocol.RESP3).build(); + + JedisPubSub pubSub = new JedisPubSub() { + public void onSubscribe(String channel, int subscribedChannels) { + this.unsubscribe(); + } + }; + + try (JedisPooled jedis = new JedisPooled(endpointConfig.getHostAndPort(), clientConfig)) { + jedis.subscribe(pubSub, "channel1"); + } + } - assertThat(args, - contains(Protocol.Command.AUTH.getRaw(), user.getBytes(), password.getBytes())); + @Test + public void testJedisPubSubReauth() { + String user = "default"; + String password = endpointConfig.getPassword(); + + IdentityProvider idProvider = mock(IdentityProvider.class); + when(idProvider.requestToken()).thenAnswer(invocation -> new SimpleToken(user, password, + System.currentTimeMillis() + 5000, System.currentTimeMillis(), null)); + + IdentityProviderConfig idProviderConfig = mock(IdentityProviderConfig.class); + when(idProviderConfig.getProvider()).thenReturn(idProvider); + + TokenAuthConfig tokenAuthConfig = TokenAuthConfig.builder() + .identityProviderConfig(idProviderConfig).expirationRefreshRatio(0.8F) + .lowerRefreshBoundMillis(4800).tokenRequestExecTimeoutInMs(1000).build(); + + AuthXManager authXManager = new AuthXManager(tokenAuthConfig); + authXManager = spy(authXManager); + List connections = new ArrayList<>(); + doAnswer(invocation -> { + Connection connection = spy((Connection) invocation.getArgument(0)); + invocation.getArguments()[0] = connection; + connections.add(connection); + Object result = invocation.callRealMethod(); + return result; + }).when(authXManager).addConnection(any(Connection.class)); + + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder().authXManager(authXManager) + .protocol(RedisProtocol.RESP3).build(); + + JedisPubSub pubSub = new JedisPubSub() { + }; + try (JedisPooled jedis = new JedisPooled(endpointConfig.getHostAndPort(), clientConfig)) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.submit(() -> { + jedis.subscribe(pubSub, "channel1"); + }); + + await().pollDelay(ONE_HUNDRED_MILLISECONDS).atMost(ONE_SECOND) + .until(pubSub::getSubscribedChannels, greaterThan(0)); + + assertEquals(1, connections.size()); + for (Connection connection : connections) { + await().pollDelay(ONE_HUNDRED_MILLISECONDS).atMost(ONE_SECOND).untilAsserted(() -> { + ArgumentCaptor captor = ArgumentCaptor.forClass(CommandArguments.class); + + verify(connection, atLeast(3)).sendCommand(captor.capture()); + assertThat(captor.getAllValues().stream() + .filter((item) -> item.getCommand() == Command.AUTH).count(), + greaterThan(3L)); + + }); + } + pubSub.unsubscribe(); + executor.shutdown(); + } + } + + @Test + public void testJedisPubSubWithResp2() { + String user = "default"; + String password = endpointConfig.getPassword(); + + IdentityProvider idProvider = mock(IdentityProvider.class); + when(idProvider.requestToken()).thenReturn(new SimpleToken(user, password, + System.currentTimeMillis() + 100000, System.currentTimeMillis(), null)); + + IdentityProviderConfig idProviderConfig = mock(IdentityProviderConfig.class); + when(idProviderConfig.getProvider()).thenReturn(idProvider); + + TokenAuthConfig tokenAuthConfig = TokenAuthConfig.builder() + .identityProviderConfig(idProviderConfig).expirationRefreshRatio(0.8F) + .lowerRefreshBoundMillis(10000).tokenRequestExecTimeoutInMs(1000).build(); + + JedisClientConfig clientConfig = DefaultJedisClientConfig.builder() + .authXManager(new AuthXManager(tokenAuthConfig)).build(); - List cmds = captor.getAllValues().stream().map(item -> item.getCommand()) - .collect(Collectors.toList()); - assertEquals(Arrays.asList(Command.AUTH, Command.CLIENT, Command.CLIENT, Command.GET), cmds); + try (JedisPooled jedis = new JedisPooled(endpointConfig.getHostAndPort(), clientConfig)) { + JedisPubSub pubSub = new JedisPubSub() { + }; + JedisException e = assertThrows(JedisException.class, + () -> jedis.subscribe(pubSub, "channel1")); + assertEquals( + "Blocking pub/sub operations are not supported on token-based authentication enabled connections with RESP2 protocol!", + e.getMessage()); } } }