From 1a015bc2dc1ac32e9792f0b507cb9fa47d9f9497 Mon Sep 17 00:00:00 2001 From: Shashank Verma Date: Tue, 19 Dec 2023 17:33:56 +0530 Subject: [PATCH] NotificationService: Run notifying users as independent task Signed-off-by: Shashank Verma --- .../moticlubs/service/NotificationService.kt | 170 ++++++++++-------- .../mnnit/moticlubs/service/PostService.kt | 6 +- .../mnnit/moticlubs/service/ReplyService.kt | 6 +- .../kotlin/com/mnnit/moticlubs/SampleTests.kt | 22 +++ 4 files changed, 123 insertions(+), 81 deletions(-) diff --git a/src/main/kotlin/com/mnnit/moticlubs/service/NotificationService.kt b/src/main/kotlin/com/mnnit/moticlubs/service/NotificationService.kt index 0518184..47951fa 100644 --- a/src/main/kotlin/com/mnnit/moticlubs/service/NotificationService.kt +++ b/src/main/kotlin/com/mnnit/moticlubs/service/NotificationService.kt @@ -16,6 +16,9 @@ import com.mnnit.moticlubs.repository.PostRepository import com.mnnit.moticlubs.repository.ReplyRepository import com.mnnit.moticlubs.repository.UserRepository import com.mnnit.moticlubs.utils.ServiceLogger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.springframework.stereotype.Service import reactor.core.publisher.Mono import reactor.util.function.Tuples @@ -40,86 +43,103 @@ class NotificationService( private val LOGGER = ServiceLogger.getLogger(NotificationService::class.java) } - fun notifyPost(post: Post, updated: Boolean): Mono = Mono - .zip( - channelRepository.findById(post.chid), - userRepository.findById(post.uid), - ) - .flatMap { tuple -> - val channel = tuple.t1 - val user = tuple.t2 - - clubRepository.findById(channel.cid) - .flatMap { club -> - Mono.just( - getPostPayload(post, user, club, channel).apply { - this["type"] = Type.POST.ordinal.toString() - this["updated"] = updated.toString() - }, - ) - } - .flatMap { payload -> - if (channel.private) { - LOGGER.info("notifyPost -> notifySubscribers") - notifyMembers(channel.chid, payload) - } else { - LOGGER.info("notifyPost -> notifyAll") - notifyAll(payload) - } - } - } - - fun notifyDeletePost(post: Post): Mono = Mono.just(post) - .flatMap { p -> - Mono.just( - HashMap().apply { - this["type"] = Type.DELETE_POST.ordinal.toString() - this["pid"] = p.pid.toString() - this["chid"] = p.chid.toString() - }, + fun notifyPost(post: Post, updated: Boolean) = coroutineWrapper { + Mono + .zip( + channelRepository.findById(post.chid), + userRepository.findById(post.uid), ) - }.flatMap { payload -> - LOGGER.info("notifyDeletePost: payload: $payload") - notifyAll(payload) - } - - fun notifyReply(reply: Reply): Mono = Mono - .zip( - postRepository.findById(reply.pid), - userRepository.findById(reply.uid), - ) - .flatMap { tuple -> - val post = tuple.t1 - val user = tuple.t2 - channelRepository.findById(post.chid).map { channel -> Tuples.of(post, user, channel) } - } - .flatMap { tuple -> - clubRepository.findById(tuple.t3.cid).map { club -> Tuples.of(tuple.t1, tuple.t2, tuple.t3, club) } - } - .map { getReplyPayload(it.t1, it.t2, reply, it.t4, it.t3) } - .flatMap { payload -> - LOGGER.info("notifyReply: ${reply.pid}") - notifyPostParticipants(reply.pid, payload) - } - - fun notifyDeleteReply(reply: Reply): Mono = postRepository.findById(reply.pid) - .flatMap { post -> - Mono.just( - HashMap().apply { - this["type"] = Type.DELETE_REPLY.ordinal.toString() - this["pid"] = reply.pid.toString() - this["chid"] = post.chid.toString() - this["time"] = reply.time.toString() - }, + .flatMap { tuple -> + val channel = tuple.t1 + val user = tuple.t2 + + clubRepository.findById(channel.cid) + .flatMap { club -> + Mono.just( + getPostPayload(post, user, club, channel).apply { + this["type"] = Type.POST.ordinal.toString() + this["updated"] = updated.toString() + }, + ) + } + .flatMap { payload -> + if (channel.private) { + LOGGER.info("notifyPost -> notifySubscribers") + notifyMembers(channel.chid, payload) + } else { + LOGGER.info("notifyPost -> notifyAll") + notifyAll(payload) + } + } + } + .subscribe() + } + + fun notifyDeletePost(post: Post) = coroutineWrapper { + Mono.just(post) + .flatMap { p -> + Mono.just( + HashMap().apply { + this["type"] = Type.DELETE_POST.ordinal.toString() + this["pid"] = p.pid.toString() + this["chid"] = p.chid.toString() + }, + ) + } + .flatMap { payload -> + LOGGER.info("notifyDeletePost: payload: $payload") + notifyAll(payload) + } + .subscribe() + } + + fun notifyReply(reply: Reply) = coroutineWrapper { + Mono + .zip( + postRepository.findById(reply.pid), + userRepository.findById(reply.uid), ) - } - .flatMap { payload -> - LOGGER.info("notifyDeleteReply: payload: $payload") - notifyAll(payload) - } + .flatMap { tuple -> + val post = tuple.t1 + val user = tuple.t2 + channelRepository.findById(post.chid).map { channel -> Tuples.of(post, user, channel) } + } + .flatMap { tuple -> + clubRepository.findById(tuple.t3.cid).map { club -> Tuples.of(tuple.t1, tuple.t2, tuple.t3, club) } + } + .map { getReplyPayload(it.t1, it.t2, reply, it.t4, it.t3) } + .flatMap { payload -> + LOGGER.info("notifyReply: ${reply.pid}") + notifyPostParticipants(reply.pid, payload) + } + .subscribe() + } + + fun notifyDeleteReply(reply: Reply) = coroutineWrapper { + postRepository.findById(reply.pid) + .flatMap { post -> + Mono.just( + HashMap().apply { + this["type"] = Type.DELETE_REPLY.ordinal.toString() + this["pid"] = reply.pid.toString() + this["chid"] = post.chid.toString() + this["time"] = reply.time.toString() + }, + ) + } + .flatMap { payload -> + LOGGER.info("notifyDeleteReply: payload: $payload") + notifyAll(payload) + } + .subscribe() + } // --------------------------------------------------------------------- // + private fun coroutineWrapper(func: () -> Unit) { + CoroutineScope(Dispatchers.IO).launch { func() } + } + private fun getPostPayload( post: Post, user: User, diff --git a/src/main/kotlin/com/mnnit/moticlubs/service/PostService.kt b/src/main/kotlin/com/mnnit/moticlubs/service/PostService.kt index 1ffca16..8127f9f 100644 --- a/src/main/kotlin/com/mnnit/moticlubs/service/PostService.kt +++ b/src/main/kotlin/com/mnnit/moticlubs/service/PostService.kt @@ -20,7 +20,7 @@ class PostService( fun savePost(post: Post): Mono = postRepository.save(post) .flatMap { savedPost -> notificationService.notifyPost(savedPost, false) - .then(Mono.just(savedPost)) + Mono.just(savedPost) } @Cacheable("post", key = "#chid + '_' + #pageRequest.pageNumber") @@ -34,14 +34,14 @@ class PostService( .updatePost(pid, dto) .flatMap { post -> notificationService.notifyPost(post, true) - .then(Mono.just(post)) + Mono.just(post) } @CacheEvict(cacheNames = ["post", "replies"], allEntries = true) fun deletePostByPid(pid: Long): Mono = postRepository .findById(pid) .flatMap { post -> + notificationService.notifyDeletePost(post) postRepository.deleteById(pid) - .then(notificationService.notifyDeletePost(post)) } } diff --git a/src/main/kotlin/com/mnnit/moticlubs/service/ReplyService.kt b/src/main/kotlin/com/mnnit/moticlubs/service/ReplyService.kt index ff76c3e..fdc86d0 100644 --- a/src/main/kotlin/com/mnnit/moticlubs/service/ReplyService.kt +++ b/src/main/kotlin/com/mnnit/moticlubs/service/ReplyService.kt @@ -19,8 +19,8 @@ class ReplyService( @CacheEvict("replies", allEntries = true) fun saveReply(reply: Reply): Mono = replyRepository.save(reply) .flatMap { savedReply -> - notificationService.notifyReply(savedReply) - .then(Mono.just(savedReply)) + notificationService.notifyReply(reply) + Mono.just(savedReply) } @Cacheable("replies", key = "#pid + '_' + #pageRequest.pageNumber") @@ -35,8 +35,8 @@ class ReplyService( if (reply.uid != uid) { Mono.error(UnauthorizedException("User not the owner of reply")) } else { + notificationService.notifyDeleteReply(reply) replyRepository.deleteById(time) - .then(notificationService.notifyDeleteReply(reply)) } } } diff --git a/src/test/kotlin/com/mnnit/moticlubs/SampleTests.kt b/src/test/kotlin/com/mnnit/moticlubs/SampleTests.kt index d178b9d..ae36154 100644 --- a/src/test/kotlin/com/mnnit/moticlubs/SampleTests.kt +++ b/src/test/kotlin/com/mnnit/moticlubs/SampleTests.kt @@ -2,6 +2,7 @@ package com.mnnit.moticlubs import com.mnnit.moticlubs.utils.ResponseStamp import org.junit.jupiter.api.Test +import reactor.core.publisher.Mono class SampleTests { @@ -48,4 +49,25 @@ class SampleTests { assert(map["MEMBER:23:6"] ?: false) assert(map["MEMBER:23"] ?: false) } + + @Test + fun testIndependentTask() { + var num = 0 + Mono.just(num) + .flatMap { + Thread { + Thread.sleep(1500) + println("Thread done with $num") + }.start() + + num++ + println("Num: $it") + Mono.just(it) + } + .subscribe() + println("Finished with $num:") + + num++ + Thread.sleep(3000) + } }