diff --git a/kotlin-libraries-data/pom.xml b/kotlin-libraries-data/pom.xml index fe41a6e6c..7d4d031e0 100644 --- a/kotlin-libraries-data/pom.xml +++ b/kotlin-libraries-data/pom.xml @@ -95,6 +95,16 @@ ${moshi.version} test + + com.github.codemonstur + embedded-redis + ${embedded-redis.version} + + + io.lettuce + lettuce-core + ${lettuce-core.version} + @@ -161,6 +171,9 @@ 2.9.0 2.35.1 1.15.1 + 1.7.1 + 1.4.3 + 6.5.0.RELEASE \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/Message.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/Message.kt new file mode 100644 index 000000000..94d8857c8 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/Message.kt @@ -0,0 +1,3 @@ +package com.baeldung.redispubsub + +data class Message(val content: String) diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt new file mode 100644 index 000000000..ecd730d18 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/MessageListener.kt @@ -0,0 +1,17 @@ +package com.baeldung.redispubsub + +import io.lettuce.core.pubsub.RedisPubSubAdapter +import java.util.concurrent.CountDownLatch + +class MessageListener : RedisPubSubAdapter() { + + var latch: CountDownLatch = CountDownLatch(1) + + var messagesReceived: List = emptyList() + override fun message(channel: String?, message: String?) { + println("Received message: $message from channel: $channel") + messagesReceived = messagesReceived.plus(message!!) + latch.countDown() + } + +} \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt new file mode 100644 index 000000000..e62c8e96c --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisConnectionManager.kt @@ -0,0 +1,26 @@ +package com.baeldung.redispubsub + +import io.lettuce.core.RedisClient +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.api.sync.RedisCommands +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands + +object RedisConnectionManager: AutoCloseable { + private val redisClient: RedisClient = RedisClient.create("redis://localhost:6379") + private val connection: StatefulRedisConnection = redisClient.connect() + + override fun close() { + connection.close() + redisClient.shutdown() + } + + fun redisSyncCommands(): RedisCommands? { + return connection.sync() + } + + fun redisPubSubAsyncCommands(messageListener: MessageListener): RedisPubSubAsyncCommands { + val pubSubConnection = redisClient.connectPubSub() + pubSubConnection.addListener(messageListener) + return pubSubConnection.async() + } +} diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPublisher.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPublisher.kt new file mode 100644 index 000000000..8f4990a7f --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisPublisher.kt @@ -0,0 +1,9 @@ +package com.baeldung.redispubsub + +class RedisPublisher { + + fun publishMessage(channel: String, message: Message) { + RedisConnectionManager.redisSyncCommands()?.publish(channel, message.content) + println("Message published: $message") + } +} \ No newline at end of file diff --git a/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisSubscriber.kt b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisSubscriber.kt new file mode 100644 index 000000000..09d81d311 --- /dev/null +++ b/kotlin-libraries-data/src/main/kotlin/com/baeldung/redispubsub/RedisSubscriber.kt @@ -0,0 +1,9 @@ +package com.baeldung.redispubsub + +class RedisSubscriber(private val messageListener: MessageListener) { + + fun subscribeToChannel(channel: String) { + RedisConnectionManager.redisPubSubAsyncCommands(messageListener).subscribe(channel) + } + +} diff --git a/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt b/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt new file mode 100644 index 000000000..6d3715def --- /dev/null +++ b/kotlin-libraries-data/src/test/kotlin/com/baeldung/redispubsub/RedisSubscriberUnitTest.kt @@ -0,0 +1,41 @@ +package com.baeldung.redispubsub + +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import redis.embedded.RedisServer +import java.util.concurrent.TimeUnit + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class RedisSubscriberUnitTest { + + val messageListener = MessageListener() + val redisSubscriber = RedisSubscriber(messageListener) + val redisPublisher = RedisPublisher() + val channel = "channel" + val message = Message("Hello, Redis!") + + val redisServer = RedisServer(6379) + + @BeforeAll + fun setUp() { + redisServer.start() + } + + @AfterAll + fun tearDown() { + RedisConnectionManager.close() + redisServer.stop() + } + + @Test + fun givenMessageListener_whenMessagePublished_thenMessageReceived() { + redisSubscriber.subscribeToChannel(channel) + redisPublisher.publishMessage(channel, message) + messageListener.latch.await(500, TimeUnit.MILLISECONDS) + assertEquals(message.content, messageListener.messagesReceived.get(0)) + } + +} \ No newline at end of file