Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscribe is not working when the client cache is enabled #4023

Open
qingdaoheze opened this issue Nov 14, 2024 · 3 comments
Open

subscribe is not working when the client cache is enabled #4023

qingdaoheze opened this issue Nov 14, 2024 · 3 comments
Assignees

Comments

@qingdaoheze
Copy link

qingdaoheze commented Nov 14, 2024

Expected behavior

subscribe can work normally.

Actual behavior

Blocked forever and can't receive any message

Steps to reproduce:

Please create a reproducible case of your problem. Make sure
that case repeats consistently and it's not random
1.Create an UnifiedJedis with the client cache
2.subscribe

Redis / Jedis Configuration

    private UnifiedJedis buildUnifiedJedis() {
        HostAndPort startNode = HostAndPort.from("localhost:6379");
        JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder()
                .resp3()
                .build();
        CacheConfig cacheConfig = getCacheConfig();
        Cache cache = CacheFactory.getCache(cacheConfig);

       return new UnifiedJedis(startNode, jedisClientConfig, cache);
    }

Jedis version:

5.2.0

Redis version:

7.4.1

Java version:

JDK 17

@qingdaoheze
Copy link
Author

Complete reproduction code:

public class JedisSubscribeWithClientCacheTest {
    private static final Logger log = LoggerFactory.getLogger(JedisSubscribeWithClientCacheTest.class);
    private UnifiedJedis buildUnifiedJedis() {
        HostAndPort startNode = HostAndPort.from("localhost:6379");
        JedisClientConfig jedisClientConfig = DefaultJedisClientConfig.builder()
                .resp3()
                .build();
        CacheConfig cacheConfig = CacheConfig.builder()
                .maxSize(10000)
                .build();
        Cache cache = CacheFactory.getCache(cacheConfig);

       return new UnifiedJedis(startNode, jedisClientConfig, cache);
    }
    @Test
    public void pubsubWithClientCache() {
        try (UnifiedJedis client = buildUnifiedJedis()) {
            JedisPubSub jedisPubSub = new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    log.info("onMessage. channel:{}, message:{}", channel, message);
                }

                @Override
                public void onPMessage(String pattern, String channel, String message) {
                    log.info("onPMessage. pattern:{}, channel:{}, message:{}", pattern, channel, message);
                }

                @Override
                public void onSubscribe(String channel, int subscribedChannels) {
                    log.info("onSubscribe. channel:{}, subscribedChannels:{}", channel, subscribedChannels);
                }

                @Override
                public void onUnsubscribe(String channel, int subscribedChannels) {
                    log.info("onUnsubscribe. channel:{}, subscribedChannels:{}", channel, subscribedChannels);
                }

                @Override
                public void onPUnsubscribe(String pattern, int subscribedChannels) {
                    log.info("onPUnsubscribe. pattern:{}, subscribedChannels:{}", pattern, subscribedChannels);
                }

                @Override
                public void onPSubscribe(String pattern, int subscribedChannels) {
                    log.info("onPSubscribe. pattern:{}, subscribedChannels:{}", pattern, subscribedChannels);
                }

                @Override
                public void onPong(String pattern) {
                    log.info("onPong. pattern:{}", pattern);
                }
            };
            client.subscribe(jedisPubSub, "hello_world");
            System.in.read();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

@qingdaoheze
Copy link
Author

The thread stack trace is as below:

   sun.nio.ch.Net.poll line: not available [native method]
   sun.nio.ch.NioSocketImpl.park line: 186 
   sun.nio.ch.NioSocketImpl.park line: 195 
   sun.nio.ch.NioSocketImpl.implRead line: 319 
   sun.nio.ch.NioSocketImpl.read line: 355 
   sun.nio.ch.NioSocketImpl$1.read line: 808 
   java.net.Socket$SocketInputStream.read line: 966 
   java.io.InputStream.read line: 218 
   redis.clients.jedis.util.RedisInputStream.ensureFill line: 256 
   redis.clients.jedis.util.RedisInputStream.peek line: 50 
   redis.clients.jedis.Protocol.readPushes line: 242 
   redis.clients.jedis.Protocol.read line: 226 
   redis.clients.jedis.csc.CacheConnection.protocolRead line: 49 
   redis.clients.jedis.Connection.readProtocolWithCheckingBroken line: 392 
   redis.clients.jedis.Connection.getUnflushedObject line: 349 
   redis.clients.jedis.JedisPubSubBase.process line: 115 
   redis.clients.jedis.JedisPubSubBase.proceed line: 92 
   redis.clients.jedis.UnifiedJedis.subscribe line: 3833

@sazzad16
Copy link
Collaborator

Thank you @qingdaoheze. We'll look into this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants