Skip to content

Commit

Permalink
NotificationService: Run notifying users as independent task
Browse files Browse the repository at this point in the history
Signed-off-by: Shashank Verma <[email protected]>
  • Loading branch information
shank03 committed Dec 19, 2023
1 parent cb7108b commit 1a015bc
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 81 deletions.
170 changes: 95 additions & 75 deletions src/main/kotlin/com/mnnit/moticlubs/service/NotificationService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import com.mnnit.moticlubs.repository.PostRepository
import com.mnnit.moticlubs.repository.ReplyRepository
import com.mnnit.moticlubs.repository.UserRepository
import com.mnnit.moticlubs.utils.ServiceLogger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono
import reactor.util.function.Tuples
Expand All @@ -40,86 +43,103 @@ class NotificationService(
private val LOGGER = ServiceLogger.getLogger(NotificationService::class.java)
}

fun notifyPost(post: Post, updated: Boolean): Mono<Void> = Mono
.zip(
channelRepository.findById(post.chid),
userRepository.findById(post.uid),
)
.flatMap { tuple ->
val channel = tuple.t1
val user = tuple.t2

clubRepository.findById(channel.cid)
.flatMap { club ->
Mono.just(
getPostPayload(post, user, club, channel).apply {
this["type"] = Type.POST.ordinal.toString()
this["updated"] = updated.toString()
},
)
}
.flatMap { payload ->
if (channel.private) {
LOGGER.info("notifyPost -> notifySubscribers")
notifyMembers(channel.chid, payload)
} else {
LOGGER.info("notifyPost -> notifyAll")
notifyAll(payload)
}
}
}

fun notifyDeletePost(post: Post): Mono<Void> = Mono.just(post)
.flatMap { p ->
Mono.just(
HashMap<String, String>().apply {
this["type"] = Type.DELETE_POST.ordinal.toString()
this["pid"] = p.pid.toString()
this["chid"] = p.chid.toString()
},
fun notifyPost(post: Post, updated: Boolean) = coroutineWrapper {
Mono
.zip(
channelRepository.findById(post.chid),
userRepository.findById(post.uid),
)
}.flatMap { payload ->
LOGGER.info("notifyDeletePost: payload: $payload")
notifyAll(payload)
}

fun notifyReply(reply: Reply): Mono<Void> = Mono
.zip(
postRepository.findById(reply.pid),
userRepository.findById(reply.uid),
)
.flatMap { tuple ->
val post = tuple.t1
val user = tuple.t2
channelRepository.findById(post.chid).map { channel -> Tuples.of(post, user, channel) }
}
.flatMap { tuple ->
clubRepository.findById(tuple.t3.cid).map { club -> Tuples.of(tuple.t1, tuple.t2, tuple.t3, club) }
}
.map { getReplyPayload(it.t1, it.t2, reply, it.t4, it.t3) }
.flatMap { payload ->
LOGGER.info("notifyReply: ${reply.pid}")
notifyPostParticipants(reply.pid, payload)
}

fun notifyDeleteReply(reply: Reply): Mono<Void> = postRepository.findById(reply.pid)
.flatMap { post ->
Mono.just(
HashMap<String, String>().apply {
this["type"] = Type.DELETE_REPLY.ordinal.toString()
this["pid"] = reply.pid.toString()
this["chid"] = post.chid.toString()
this["time"] = reply.time.toString()
},
.flatMap { tuple ->
val channel = tuple.t1
val user = tuple.t2

clubRepository.findById(channel.cid)
.flatMap { club ->
Mono.just(
getPostPayload(post, user, club, channel).apply {
this["type"] = Type.POST.ordinal.toString()
this["updated"] = updated.toString()
},
)
}
.flatMap { payload ->
if (channel.private) {
LOGGER.info("notifyPost -> notifySubscribers")
notifyMembers(channel.chid, payload)
} else {
LOGGER.info("notifyPost -> notifyAll")
notifyAll(payload)
}
}
}
.subscribe()
}

fun notifyDeletePost(post: Post) = coroutineWrapper {
Mono.just(post)
.flatMap { p ->
Mono.just(
HashMap<String, String>().apply {
this["type"] = Type.DELETE_POST.ordinal.toString()
this["pid"] = p.pid.toString()
this["chid"] = p.chid.toString()
},
)
}
.flatMap { payload ->
LOGGER.info("notifyDeletePost: payload: $payload")
notifyAll(payload)
}
.subscribe()
}

fun notifyReply(reply: Reply) = coroutineWrapper {
Mono
.zip(
postRepository.findById(reply.pid),
userRepository.findById(reply.uid),
)
}
.flatMap { payload ->
LOGGER.info("notifyDeleteReply: payload: $payload")
notifyAll(payload)
}
.flatMap { tuple ->
val post = tuple.t1
val user = tuple.t2
channelRepository.findById(post.chid).map { channel -> Tuples.of(post, user, channel) }
}
.flatMap { tuple ->
clubRepository.findById(tuple.t3.cid).map { club -> Tuples.of(tuple.t1, tuple.t2, tuple.t3, club) }
}
.map { getReplyPayload(it.t1, it.t2, reply, it.t4, it.t3) }
.flatMap { payload ->
LOGGER.info("notifyReply: ${reply.pid}")
notifyPostParticipants(reply.pid, payload)
}
.subscribe()
}

fun notifyDeleteReply(reply: Reply) = coroutineWrapper {
postRepository.findById(reply.pid)
.flatMap { post ->
Mono.just(
HashMap<String, String>().apply {
this["type"] = Type.DELETE_REPLY.ordinal.toString()
this["pid"] = reply.pid.toString()
this["chid"] = post.chid.toString()
this["time"] = reply.time.toString()
},
)
}
.flatMap { payload ->
LOGGER.info("notifyDeleteReply: payload: $payload")
notifyAll(payload)
}
.subscribe()
}

// --------------------------------------------------------------------- //

private fun coroutineWrapper(func: () -> Unit) {
CoroutineScope(Dispatchers.IO).launch { func() }
}

private fun getPostPayload(
post: Post,
user: User,
Expand Down
6 changes: 3 additions & 3 deletions src/main/kotlin/com/mnnit/moticlubs/service/PostService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class PostService(
fun savePost(post: Post): Mono<Post> = postRepository.save(post)
.flatMap { savedPost ->
notificationService.notifyPost(savedPost, false)
.then(Mono.just(savedPost))
Mono.just(savedPost)
}

@Cacheable("post", key = "#chid + '_' + #pageRequest.pageNumber")
Expand All @@ -34,14 +34,14 @@ class PostService(
.updatePost(pid, dto)
.flatMap { post ->
notificationService.notifyPost(post, true)
.then(Mono.just(post))
Mono.just(post)
}

@CacheEvict(cacheNames = ["post", "replies"], allEntries = true)
fun deletePostByPid(pid: Long): Mono<Void> = postRepository
.findById(pid)
.flatMap { post ->
notificationService.notifyDeletePost(post)
postRepository.deleteById(pid)
.then(notificationService.notifyDeletePost(post))
}
}
6 changes: 3 additions & 3 deletions src/main/kotlin/com/mnnit/moticlubs/service/ReplyService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class ReplyService(
@CacheEvict("replies", allEntries = true)
fun saveReply(reply: Reply): Mono<Reply> = replyRepository.save(reply)
.flatMap { savedReply ->
notificationService.notifyReply(savedReply)
.then(Mono.just(savedReply))
notificationService.notifyReply(reply)
Mono.just(savedReply)
}

@Cacheable("replies", key = "#pid + '_' + #pageRequest.pageNumber")
Expand All @@ -35,8 +35,8 @@ class ReplyService(
if (reply.uid != uid) {
Mono.error(UnauthorizedException("User not the owner of reply"))
} else {
notificationService.notifyDeleteReply(reply)
replyRepository.deleteById(time)
.then(notificationService.notifyDeleteReply(reply))
}
}
}
22 changes: 22 additions & 0 deletions src/test/kotlin/com/mnnit/moticlubs/SampleTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.mnnit.moticlubs

import com.mnnit.moticlubs.utils.ResponseStamp
import org.junit.jupiter.api.Test
import reactor.core.publisher.Mono

class SampleTests {

Expand Down Expand Up @@ -48,4 +49,25 @@ class SampleTests {
assert(map["MEMBER:23:6"] ?: false)
assert(map["MEMBER:23"] ?: false)
}

@Test
fun testIndependentTask() {
var num = 0
Mono.just(num)
.flatMap {
Thread {
Thread.sleep(1500)
println("Thread done with $num")
}.start()

num++
println("Num: $it")
Mono.just(it)
}
.subscribe()
println("Finished with $num:")

num++
Thread.sleep(3000)
}
}

0 comments on commit 1a015bc

Please sign in to comment.