Skip to content

Commit

Permalink
add triggers & update deps
Browse files Browse the repository at this point in the history
  • Loading branch information
gpopides committed Dec 2, 2023
1 parent 0e4067b commit 4f4487a
Show file tree
Hide file tree
Showing 17 changed files with 307 additions and 49 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
plugins {
kotlin("jvm") version "1.9.0"
kotlin("jvm") version "1.9.20"
`maven-publish`
id("com.ncorti.ktfmt.gradle") version "0.12.0"
}
Expand All @@ -13,7 +13,7 @@ subprojects {

repositories { mavenCentral() }

dependencies { implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.9.0") }
dependencies { implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.9.20") }

kotlin { jvmToolchain(17) }

Expand Down
211 changes: 195 additions & 16 deletions core/src/integrationTest/kotlin/com/katanox/tabour/TabourTest.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
package com.katanox.tabour

import com.katanox.tabour.configuration.core.DataProductionConfiguration
import com.katanox.tabour.configuration.core.tabour
import com.katanox.tabour.configuration.sqs.sqsConsumer
import com.katanox.tabour.configuration.sqs.sqsConsumerConfiguration
import com.katanox.tabour.configuration.sqs.sqsProducer
import com.katanox.tabour.configuration.sqs.sqsRegistry
import com.katanox.tabour.configuration.sqs.sqsRegistryConfiguration
import com.katanox.tabour.error.ProducerNotFound
import com.katanox.tabour.error.ProductionResourceNotFound
import com.katanox.tabour.error.RegistryNotFound
import com.katanox.tabour.sqs.production.FifoQueueData
import com.katanox.tabour.sqs.production.NonFifoQueueData
import com.katanox.tabour.sqs.production.SqsDataForProduction
import com.katanox.tabour.sqs.production.SqsMessageProduced
import java.net.URL
import java.time.Duration
import kotlin.test.DefaultAsserter.assertNotNull
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertTrue
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import org.awaitility.kotlin.await
import org.awaitility.kotlin.withPollDelay
Expand Down Expand Up @@ -103,6 +112,12 @@ class TabourTest {

val sqsRegistry = sqsRegistry(config)
var counter = 0
val sqsProducerConfiguration =
DataProductionConfiguration<SqsDataForProduction, SqsMessageProduced>(
produceData = { NonFifoQueueData("this is a test message") },
dataProduced = { _, _ -> },
resourceNotFound = { _ -> println("Resource not found") }
)

val producer =
sqsProducer(URL(nonFifoQueueUrl), "test-producer") { onError = { println(it) } }
Expand All @@ -123,12 +138,11 @@ class TabourTest {
container.register(sqsRegistry)
container.start()

container.produceMessage("test-registry", "test-producer") {
NonFifoQueueData("this is a test message")
}
container.produceMessage("test-registry", "test-producer", sqsProducerConfiguration)

// after 2 seconds, assert that we fetched the 1 message we produced earlier
await.withPollDelay(Duration.ofSeconds(2)).untilAsserted { assertEquals(1, counter) }
await.withPollDelay(Duration.ofSeconds(3)).untilAsserted { assertTrue { counter > 1 } }

container.stop()
purgeQueue(nonFifoQueueUrl)
}

Expand Down Expand Up @@ -171,14 +185,24 @@ class TabourTest {
container.register(sqsRegistry)
container.start()

val sqsProducerConfiguration =
DataProductionConfiguration<SqsDataForProduction, SqsMessageProduced>(
produceData = { NonFifoQueueData("this is a test message") },
dataProduced = { _, _ -> },
resourceNotFound = { _ -> println("Resource not found") }
)

repeat(50) {
container.produceMessage("test-registry", "test-producer") {
NonFifoQueueData("this is a test message - $it")
}
container.produceMessage("test-registry", "test-producer", sqsProducerConfiguration)
}

// we assert that in 1 second all (50) messages will be consumed by 5 workers
await.withPollDelay(Duration.ofSeconds(1)).untilAsserted { assertEquals(50, counter) }
// we assert that more than 10 messages were consumed in 2 second with multiple
// consumers
container.stop()
await.withPollDelay(Duration.ofSeconds(3)).untilAsserted {
println(counter)
assertTrue { counter > 10 }
}
}

@Test
Expand All @@ -197,6 +221,12 @@ class TabourTest {
}

val sqsRegistry = sqsRegistry(config)
val sqsProducerConfiguration =
DataProductionConfiguration<SqsDataForProduction, SqsMessageProduced>(
produceData = { NonFifoQueueData("this is a test message") },
dataProduced = { _, _ -> },
resourceNotFound = { _ -> println("Resource not found") }
)

val producer =
sqsProducer(URL(nonFifoQueueUrl), "test-producer") { onError = { println(it) } }
Expand All @@ -205,9 +235,7 @@ class TabourTest {
container.register(sqsRegistry)
container.start()

container.produceMessage("test-registry", "test-producer") {
NonFifoQueueData("this is a test message")
}
container.produceMessage("test-registry", "test-producer", sqsProducerConfiguration)

await
.withPollInterval(Duration.ofMillis(500))
Expand All @@ -228,6 +256,7 @@ class TabourTest {
)
}

container.stop()
purgeQueue(nonFifoQueueUrl)
}

Expand All @@ -247,6 +276,12 @@ class TabourTest {
}

val sqsRegistry = sqsRegistry(config)
val sqsProducerConfiguration =
DataProductionConfiguration<SqsDataForProduction, SqsMessageProduced>(
produceData = { FifoQueueData("this is a fifo test message", "group1") },
dataProduced = { _, _ -> },
resourceNotFound = { _ -> println("Resource not found") }
)

val producer =
sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } }
Expand All @@ -255,9 +290,11 @@ class TabourTest {
container.register(sqsRegistry)
container.start()

container.produceMessage("test-registry", "fifo-test-producer") {
FifoQueueData("this is a fifo test message", "group1")
}
container.produceMessage(
"test-registry",
"fifo-test-producer",
sqsProducerConfiguration
)

await
.withPollInterval(Duration.ofMillis(500))
Expand All @@ -278,6 +315,148 @@ class TabourTest {
)
}
purgeQueue(fifoQueueUrl)
container.stop()
}

@Test
@Tag("sqs-producer-test")
fun `successful production triggers dataProduced function`() =
runTest(UnconfinedTestDispatcher()) {
val container = tabour { numOfThreads = 1 }
val config =
sqsRegistryConfiguration(
"test-registry",
StaticCredentialsProvider.create(credentials),
Region.of(localstack.region)
) {
this.endpointOverride =
localstack.getEndpointOverride(LocalStackContainer.Service.SQS)
}

val sqsRegistry = sqsRegistry(config)
var expectedProduceData: SqsDataForProduction? = null
var producedDataEvent: SqsMessageProduced? = null

val sqsProducerConfiguration =
DataProductionConfiguration<SqsDataForProduction, SqsMessageProduced>(
produceData = { FifoQueueData("this is a fifo test message", "group1") },
dataProduced = { data, event ->
expectedProduceData = data
producedDataEvent = event
},
resourceNotFound = { _ -> println("Resource not found") }
)

val producer =
sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } }

sqsRegistry.addProducer(producer)
container.register(sqsRegistry)
container.start()

container.produceMessage(
"test-registry",
"fifo-test-producer",
sqsProducerConfiguration
)

purgeQueue(fifoQueueUrl)
container.stop()

await.withPollDelay(Duration.ofSeconds(1)).untilAsserted {
assertEquals(
FifoQueueData("this is a fifo test message", "group1"),
expectedProduceData
)
assertNotNull("Message group id is null", producedDataEvent?.messageGroupId)
assertNotEquals("", producedDataEvent?.messageGroupId)
}
}

@Test
@Tag("sqs-producer-test")
fun `produce a message with wrong registry key triggers resource not found error`() =
runTest(UnconfinedTestDispatcher()) {
val container = tabour { numOfThreads = 1 }
val config =
sqsRegistryConfiguration(
"test-registry",
StaticCredentialsProvider.create(credentials),
Region.of(localstack.region)
) {
this.endpointOverride =
localstack.getEndpointOverride(LocalStackContainer.Service.SQS)
}

val sqsRegistry = sqsRegistry(config)
var resourceNotFound: ProductionResourceNotFound? = null
val sqsProducerConfiguration =
DataProductionConfiguration<SqsDataForProduction, SqsMessageProduced>(
produceData = { FifoQueueData("this is a fifo test message", "group1") },
dataProduced = { _, _ -> },
resourceNotFound = { error -> resourceNotFound = error }
)

val producer =
sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } }

sqsRegistry.addProducer(producer)
container.register(sqsRegistry)
container.start()

container.produceMessage(
"wrong-registry",
"fifo-test-producer",
sqsProducerConfiguration
)

advanceUntilIdle()
container.stop()

await.withPollDelay(Duration.ofSeconds(1)).untilAsserted {
assertEquals(RegistryNotFound("wrong-registry"), resourceNotFound)
}
}

@Test
@Tag("sqs-producer-test")
fun `produce a message with wrong producer key triggers resource not found error`() =
runTest(UnconfinedTestDispatcher()) {
val container = tabour { numOfThreads = 1 }
val config =
sqsRegistryConfiguration(
"test-registry",
StaticCredentialsProvider.create(credentials),
Region.of(localstack.region)
) {
this.endpointOverride =
localstack.getEndpointOverride(LocalStackContainer.Service.SQS)
}

val sqsRegistry = sqsRegistry(config)
var resourceNotFound: ProductionResourceNotFound? = null
val sqsProducerConfiguration =
DataProductionConfiguration<SqsDataForProduction, SqsMessageProduced>(
produceData = { FifoQueueData("this is a fifo test message", "group1") },
dataProduced = { _, _ -> },
resourceNotFound = { error -> resourceNotFound = error }
)

val producer =
sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } }

sqsRegistry.addProducer(producer)
container.register(sqsRegistry)
container.start()

container.produceMessage("test-registry", "wrong-producer", sqsProducerConfiguration)

advanceUntilIdle()
container.stop()

await.withPollDelay(Duration.ofSeconds(1)).untilAsserted {
assertEquals(ProducerNotFound("wrong-producer"), resourceNotFound)
}
}

private fun purgeQueue(url: String) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/kotlin/com/katanox/tabour/Retry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.katanox.tabour

suspend inline fun retry(
repeatTimes: Int,
onError: (Exception) -> Unit,
onError: (Throwable) -> Unit,
crossinline f: suspend () -> Unit
) {
var tries = 0
Expand All @@ -11,7 +11,7 @@ suspend inline fun retry(
try {
f()
break
} catch (e: Exception) {
} catch (e: Throwable) {
tries++

if (tries == repeatTimes) {
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/kotlin/com/katanox/tabour/Tabour.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package com.katanox.tabour

import com.katanox.tabour.configuration.Registry
import com.katanox.tabour.consumption.Config
import com.katanox.tabour.error.RegistryNotFound
import com.katanox.tabour.sqs.SqsRegistry
import com.katanox.tabour.sqs.production.SqsDataForProduction
import com.katanox.tabour.sqs.production.SqsDataProductionConfiguration
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.newFixedThreadPoolContext

Expand Down Expand Up @@ -50,21 +50,20 @@ class Tabour internal constructor(val config: Configuration) {
*
* [producerKey]: the key of the producer itself
*
* [produceFn]: A function that returns a Pair<String?, String>. The first part of the pair is
* the body of the message and the second part is the message group id. If the body is null, a
* message is not produced
* [f]: A function that returns a Pair<String?, String>. The first part of the pair is the body
* of the message and the second part is the message group id. If the body is null, a message is
* not produced
*
* Note: If the registry is not found (either wrong Registry or Producer key), nothing happens.
*/
suspend fun <T, K> produceMessage(
registryKey: K,
producerKey: T,
produceFn: () -> SqsDataForProduction
f: SqsDataProductionConfiguration
) {
when (val registry = registries.find { it.key == registryKey }) {
is SqsRegistry ->
scope.launch { coroutineScope { registry.produce(producerKey, produceFn) } }
else -> {}
is SqsRegistry -> scope.launch { registry.produce(producerKey, f) }
else -> f.resourceNotFound(RegistryNotFound(registryKey))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ package com.katanox.tabour.configuration.core

import com.katanox.tabour.Tabour
import com.katanox.tabour.consumption.Config
import com.katanox.tabour.error.ProductionResourceNotFound

/** Creates a new [Tabour] instance */
fun tabour(init: Tabour.Configuration.() -> Unit): Tabour =
Tabour(config(Tabour.Configuration(), init))

internal fun <T : Config> config(conf: T, init: T.() -> Unit): T = conf.apply { init() }

data class DataProductionConfiguration<T, K>(
val produceData: () -> T,
val dataProduced: (T, K) -> Unit,
val resourceNotFound: (ProductionResourceNotFound) -> Unit
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ sealed interface ConsumptionError {
data class AwsError(val details: AwsErrorDetails) : ConsumptionError
data class AwsSdkClientError(val exception: SdkClientException) : ConsumptionError
data class UnsuccessfulConsumption(val message: Message) : ConsumptionError
data class UnrecognizedError(val exception: Exception) : ConsumptionError
data class UnrecognizedError(val error: Throwable) : ConsumptionError
}
Loading

0 comments on commit 4f4487a

Please sign in to comment.