diff --git a/build.gradle.kts b/build.gradle.kts index 8acc4c06..314ab7da 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,5 +1,5 @@ plugins { - kotlin("jvm") version "1.9.0" + kotlin("jvm") version "1.9.20" `maven-publish` id("com.ncorti.ktfmt.gradle") version "0.12.0" } @@ -13,7 +13,7 @@ subprojects { repositories { mavenCentral() } - dependencies { implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.9.0") } + dependencies { implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.9.20") } kotlin { jvmToolchain(17) } diff --git a/core/src/integrationTest/kotlin/com/katanox/tabour/TabourTest.kt b/core/src/integrationTest/kotlin/com/katanox/tabour/TabourTest.kt index d779e9b5..59c467e5 100644 --- a/core/src/integrationTest/kotlin/com/katanox/tabour/TabourTest.kt +++ b/core/src/integrationTest/kotlin/com/katanox/tabour/TabourTest.kt @@ -1,19 +1,28 @@ package com.katanox.tabour +import com.katanox.tabour.configuration.core.DataProductionConfiguration import com.katanox.tabour.configuration.core.tabour import com.katanox.tabour.configuration.sqs.sqsConsumer import com.katanox.tabour.configuration.sqs.sqsConsumerConfiguration import com.katanox.tabour.configuration.sqs.sqsProducer import com.katanox.tabour.configuration.sqs.sqsRegistry import com.katanox.tabour.configuration.sqs.sqsRegistryConfiguration +import com.katanox.tabour.error.ProducerNotFound +import com.katanox.tabour.error.ProductionResourceNotFound +import com.katanox.tabour.error.RegistryNotFound import com.katanox.tabour.sqs.production.FifoQueueData import com.katanox.tabour.sqs.production.NonFifoQueueData +import com.katanox.tabour.sqs.production.SqsDataForProduction +import com.katanox.tabour.sqs.production.SqsMessageProduced import java.net.URL import java.time.Duration +import kotlin.test.DefaultAsserter.assertNotNull import kotlin.test.assertEquals +import kotlin.test.assertNotEquals import kotlin.test.assertTrue import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest import org.awaitility.kotlin.await import org.awaitility.kotlin.withPollDelay @@ -103,6 +112,12 @@ class TabourTest { val sqsRegistry = sqsRegistry(config) var counter = 0 + val sqsProducerConfiguration = + DataProductionConfiguration( + produceData = { NonFifoQueueData("this is a test message") }, + dataProduced = { _, _ -> }, + resourceNotFound = { _ -> println("Resource not found") } + ) val producer = sqsProducer(URL(nonFifoQueueUrl), "test-producer") { onError = { println(it) } } @@ -123,12 +138,11 @@ class TabourTest { container.register(sqsRegistry) container.start() - container.produceMessage("test-registry", "test-producer") { - NonFifoQueueData("this is a test message") - } + container.produceMessage("test-registry", "test-producer", sqsProducerConfiguration) - // after 2 seconds, assert that we fetched the 1 message we produced earlier - await.withPollDelay(Duration.ofSeconds(2)).untilAsserted { assertEquals(1, counter) } + await.withPollDelay(Duration.ofSeconds(3)).untilAsserted { assertTrue { counter > 1 } } + + container.stop() purgeQueue(nonFifoQueueUrl) } @@ -171,14 +185,24 @@ class TabourTest { container.register(sqsRegistry) container.start() + val sqsProducerConfiguration = + DataProductionConfiguration( + produceData = { NonFifoQueueData("this is a test message") }, + dataProduced = { _, _ -> }, + resourceNotFound = { _ -> println("Resource not found") } + ) + repeat(50) { - container.produceMessage("test-registry", "test-producer") { - NonFifoQueueData("this is a test message - $it") - } + container.produceMessage("test-registry", "test-producer", sqsProducerConfiguration) } - // we assert that in 1 second all (50) messages will be consumed by 5 workers - await.withPollDelay(Duration.ofSeconds(1)).untilAsserted { assertEquals(50, counter) } + // we assert that more than 10 messages were consumed in 2 second with multiple + // consumers + container.stop() + await.withPollDelay(Duration.ofSeconds(3)).untilAsserted { + println(counter) + assertTrue { counter > 10 } + } } @Test @@ -197,6 +221,12 @@ class TabourTest { } val sqsRegistry = sqsRegistry(config) + val sqsProducerConfiguration = + DataProductionConfiguration( + produceData = { NonFifoQueueData("this is a test message") }, + dataProduced = { _, _ -> }, + resourceNotFound = { _ -> println("Resource not found") } + ) val producer = sqsProducer(URL(nonFifoQueueUrl), "test-producer") { onError = { println(it) } } @@ -205,9 +235,7 @@ class TabourTest { container.register(sqsRegistry) container.start() - container.produceMessage("test-registry", "test-producer") { - NonFifoQueueData("this is a test message") - } + container.produceMessage("test-registry", "test-producer", sqsProducerConfiguration) await .withPollInterval(Duration.ofMillis(500)) @@ -228,6 +256,7 @@ class TabourTest { ) } + container.stop() purgeQueue(nonFifoQueueUrl) } @@ -247,6 +276,12 @@ class TabourTest { } val sqsRegistry = sqsRegistry(config) + val sqsProducerConfiguration = + DataProductionConfiguration( + produceData = { FifoQueueData("this is a fifo test message", "group1") }, + dataProduced = { _, _ -> }, + resourceNotFound = { _ -> println("Resource not found") } + ) val producer = sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } } @@ -255,9 +290,11 @@ class TabourTest { container.register(sqsRegistry) container.start() - container.produceMessage("test-registry", "fifo-test-producer") { - FifoQueueData("this is a fifo test message", "group1") - } + container.produceMessage( + "test-registry", + "fifo-test-producer", + sqsProducerConfiguration + ) await .withPollInterval(Duration.ofMillis(500)) @@ -278,6 +315,148 @@ class TabourTest { ) } purgeQueue(fifoQueueUrl) + container.stop() + } + + @Test + @Tag("sqs-producer-test") + fun `successful production triggers dataProduced function`() = + runTest(UnconfinedTestDispatcher()) { + val container = tabour { numOfThreads = 1 } + val config = + sqsRegistryConfiguration( + "test-registry", + StaticCredentialsProvider.create(credentials), + Region.of(localstack.region) + ) { + this.endpointOverride = + localstack.getEndpointOverride(LocalStackContainer.Service.SQS) + } + + val sqsRegistry = sqsRegistry(config) + var expectedProduceData: SqsDataForProduction? = null + var producedDataEvent: SqsMessageProduced? = null + + val sqsProducerConfiguration = + DataProductionConfiguration( + produceData = { FifoQueueData("this is a fifo test message", "group1") }, + dataProduced = { data, event -> + expectedProduceData = data + producedDataEvent = event + }, + resourceNotFound = { _ -> println("Resource not found") } + ) + + val producer = + sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } } + + sqsRegistry.addProducer(producer) + container.register(sqsRegistry) + container.start() + + container.produceMessage( + "test-registry", + "fifo-test-producer", + sqsProducerConfiguration + ) + + purgeQueue(fifoQueueUrl) + container.stop() + + await.withPollDelay(Duration.ofSeconds(1)).untilAsserted { + assertEquals( + FifoQueueData("this is a fifo test message", "group1"), + expectedProduceData + ) + assertNotNull("Message group id is null", producedDataEvent?.messageGroupId) + assertNotEquals("", producedDataEvent?.messageGroupId) + } + } + + @Test + @Tag("sqs-producer-test") + fun `produce a message with wrong registry key triggers resource not found error`() = + runTest(UnconfinedTestDispatcher()) { + val container = tabour { numOfThreads = 1 } + val config = + sqsRegistryConfiguration( + "test-registry", + StaticCredentialsProvider.create(credentials), + Region.of(localstack.region) + ) { + this.endpointOverride = + localstack.getEndpointOverride(LocalStackContainer.Service.SQS) + } + + val sqsRegistry = sqsRegistry(config) + var resourceNotFound: ProductionResourceNotFound? = null + val sqsProducerConfiguration = + DataProductionConfiguration( + produceData = { FifoQueueData("this is a fifo test message", "group1") }, + dataProduced = { _, _ -> }, + resourceNotFound = { error -> resourceNotFound = error } + ) + + val producer = + sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } } + + sqsRegistry.addProducer(producer) + container.register(sqsRegistry) + container.start() + + container.produceMessage( + "wrong-registry", + "fifo-test-producer", + sqsProducerConfiguration + ) + + advanceUntilIdle() + container.stop() + + await.withPollDelay(Duration.ofSeconds(1)).untilAsserted { + assertEquals(RegistryNotFound("wrong-registry"), resourceNotFound) + } + } + + @Test + @Tag("sqs-producer-test") + fun `produce a message with wrong producer key triggers resource not found error`() = + runTest(UnconfinedTestDispatcher()) { + val container = tabour { numOfThreads = 1 } + val config = + sqsRegistryConfiguration( + "test-registry", + StaticCredentialsProvider.create(credentials), + Region.of(localstack.region) + ) { + this.endpointOverride = + localstack.getEndpointOverride(LocalStackContainer.Service.SQS) + } + + val sqsRegistry = sqsRegistry(config) + var resourceNotFound: ProductionResourceNotFound? = null + val sqsProducerConfiguration = + DataProductionConfiguration( + produceData = { FifoQueueData("this is a fifo test message", "group1") }, + dataProduced = { _, _ -> }, + resourceNotFound = { error -> resourceNotFound = error } + ) + + val producer = + sqsProducer(URL(fifoQueueUrl), "fifo-test-producer") { onError = { println(it) } } + + sqsRegistry.addProducer(producer) + container.register(sqsRegistry) + container.start() + + container.produceMessage("test-registry", "wrong-producer", sqsProducerConfiguration) + + advanceUntilIdle() + container.stop() + + await.withPollDelay(Duration.ofSeconds(1)).untilAsserted { + assertEquals(ProducerNotFound("wrong-producer"), resourceNotFound) + } } private fun purgeQueue(url: String) { diff --git a/core/src/main/kotlin/com/katanox/tabour/Retry.kt b/core/src/main/kotlin/com/katanox/tabour/Retry.kt index 3cf1e741..df8d251b 100644 --- a/core/src/main/kotlin/com/katanox/tabour/Retry.kt +++ b/core/src/main/kotlin/com/katanox/tabour/Retry.kt @@ -2,7 +2,7 @@ package com.katanox.tabour suspend inline fun retry( repeatTimes: Int, - onError: (Exception) -> Unit, + onError: (Throwable) -> Unit, crossinline f: suspend () -> Unit ) { var tries = 0 @@ -11,7 +11,7 @@ suspend inline fun retry( try { f() break - } catch (e: Exception) { + } catch (e: Throwable) { tries++ if (tries == repeatTimes) { diff --git a/core/src/main/kotlin/com/katanox/tabour/Tabour.kt b/core/src/main/kotlin/com/katanox/tabour/Tabour.kt index f56b5e07..720453a6 100644 --- a/core/src/main/kotlin/com/katanox/tabour/Tabour.kt +++ b/core/src/main/kotlin/com/katanox/tabour/Tabour.kt @@ -2,12 +2,12 @@ package com.katanox.tabour import com.katanox.tabour.configuration.Registry import com.katanox.tabour.consumption.Config +import com.katanox.tabour.error.RegistryNotFound import com.katanox.tabour.sqs.SqsRegistry -import com.katanox.tabour.sqs.production.SqsDataForProduction +import com.katanox.tabour.sqs.production.SqsDataProductionConfiguration import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.newFixedThreadPoolContext @@ -50,21 +50,20 @@ class Tabour internal constructor(val config: Configuration) { * * [producerKey]: the key of the producer itself * - * [produceFn]: A function that returns a Pair. The first part of the pair is - * the body of the message and the second part is the message group id. If the body is null, a - * message is not produced + * [f]: A function that returns a Pair. The first part of the pair is the body + * of the message and the second part is the message group id. If the body is null, a message is + * not produced * * Note: If the registry is not found (either wrong Registry or Producer key), nothing happens. */ suspend fun produceMessage( registryKey: K, producerKey: T, - produceFn: () -> SqsDataForProduction + f: SqsDataProductionConfiguration ) { when (val registry = registries.find { it.key == registryKey }) { - is SqsRegistry -> - scope.launch { coroutineScope { registry.produce(producerKey, produceFn) } } - else -> {} + is SqsRegistry -> scope.launch { registry.produce(producerKey, f) } + else -> f.resourceNotFound(RegistryNotFound(registryKey)) } } diff --git a/core/src/main/kotlin/com/katanox/tabour/configuration/core/Core.kt b/core/src/main/kotlin/com/katanox/tabour/configuration/core/Core.kt index 4403ee31..f02db984 100644 --- a/core/src/main/kotlin/com/katanox/tabour/configuration/core/Core.kt +++ b/core/src/main/kotlin/com/katanox/tabour/configuration/core/Core.kt @@ -2,9 +2,16 @@ package com.katanox.tabour.configuration.core import com.katanox.tabour.Tabour import com.katanox.tabour.consumption.Config +import com.katanox.tabour.error.ProductionResourceNotFound /** Creates a new [Tabour] instance */ fun tabour(init: Tabour.Configuration.() -> Unit): Tabour = Tabour(config(Tabour.Configuration(), init)) internal fun config(conf: T, init: T.() -> Unit): T = conf.apply { init() } + +data class DataProductionConfiguration( + val produceData: () -> T, + val dataProduced: (T, K) -> Unit, + val resourceNotFound: (ProductionResourceNotFound) -> Unit +) diff --git a/core/src/main/kotlin/com/katanox/tabour/consumption/ConsumptionError.kt b/core/src/main/kotlin/com/katanox/tabour/consumption/ConsumptionError.kt index 10166ed5..6e974b15 100644 --- a/core/src/main/kotlin/com/katanox/tabour/consumption/ConsumptionError.kt +++ b/core/src/main/kotlin/com/katanox/tabour/consumption/ConsumptionError.kt @@ -8,5 +8,5 @@ sealed interface ConsumptionError { data class AwsError(val details: AwsErrorDetails) : ConsumptionError data class AwsSdkClientError(val exception: SdkClientException) : ConsumptionError data class UnsuccessfulConsumption(val message: Message) : ConsumptionError - data class UnrecognizedError(val exception: Exception) : ConsumptionError + data class UnrecognizedError(val error: Throwable) : ConsumptionError } diff --git a/core/src/main/kotlin/com/katanox/tabour/error/TabourErrors.kt b/core/src/main/kotlin/com/katanox/tabour/error/TabourErrors.kt new file mode 100644 index 00000000..a512bc69 --- /dev/null +++ b/core/src/main/kotlin/com/katanox/tabour/error/TabourErrors.kt @@ -0,0 +1,7 @@ +package com.katanox.tabour.error + +sealed interface ProductionResourceNotFound + +data class RegistryNotFound(val registryKey: T) : ProductionResourceNotFound + +data class ProducerNotFound(val producerKey: T) : ProductionResourceNotFound diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/SqsRegistry.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/SqsRegistry.kt index 17cc819b..99a55bec 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/SqsRegistry.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/SqsRegistry.kt @@ -2,9 +2,10 @@ package com.katanox.tabour.sqs import com.katanox.tabour.configuration.Registry import com.katanox.tabour.consumption.Config +import com.katanox.tabour.error.ProducerNotFound import com.katanox.tabour.sqs.config.SqsConsumer import com.katanox.tabour.sqs.consumption.SqsPoller -import com.katanox.tabour.sqs.production.SqsDataForProduction +import com.katanox.tabour.sqs.production.SqsDataProductionConfiguration import com.katanox.tabour.sqs.production.SqsProducer import com.katanox.tabour.sqs.production.SqsProducerExecutor import java.net.URI @@ -71,11 +72,13 @@ internal constructor( sqsPoller.stopPolling() } - suspend fun produce(producerKey: T, produceFn: () -> SqsDataForProduction) { + suspend fun produce(producerKey: T, f: SqsDataProductionConfiguration) { val producer = producers.find { it.key == producerKey } if (producer != null) { - sqsProducerExecutor.produce(producer, produceFn) + sqsProducerExecutor.produce(producer, f) + } else { + f.resourceNotFound(ProducerNotFound(producerKey)) } } diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/config/SqsPipeline.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/config/SqsPipeline.kt index a73819c1..fc97b3c3 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/config/SqsPipeline.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/config/SqsPipeline.kt @@ -1,6 +1,7 @@ package com.katanox.tabour.sqs.config import com.katanox.tabour.consumption.Config +import com.katanox.tabour.error.ProductionResourceNotFound import com.katanox.tabour.sqs.production.NonFifoQueueData import com.katanox.tabour.sqs.production.SqsDataForProduction import com.katanox.tabour.sqs.production.SqsProducer @@ -14,4 +15,6 @@ class SqsPipeline internal constructor() : Config { * consumed sqs message */ var transformer: (Message) -> SqsDataForProduction = { NonFifoQueueData(message = null) } + + val failedHandler: (ProductionResourceNotFound) -> Unit = {} } diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/consumption/SqsPoller.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/consumption/SqsPoller.kt index 9e41b609..0a486d51 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/consumption/SqsPoller.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/consumption/SqsPoller.kt @@ -1,8 +1,11 @@ package com.katanox.tabour.sqs.consumption +import com.katanox.tabour.configuration.core.DataProductionConfiguration import com.katanox.tabour.consumption.ConsumptionError import com.katanox.tabour.retry import com.katanox.tabour.sqs.config.SqsConsumer +import com.katanox.tabour.sqs.production.SqsDataForProduction +import com.katanox.tabour.sqs.production.SqsMessageProduced import com.katanox.tabour.sqs.production.SqsProducerExecutor import java.net.URL import kotlinx.coroutines.coroutineScope @@ -71,7 +74,7 @@ internal class SqsPoller(private val sqs: SqsClient, private val executor: SqsPr pipeline?.producer?.let { var consumedFromPipeline = false - executor.produce(it) { + val produceData: () -> SqsDataForProduction = { pipeline.transformer(message).also { transformationResult -> consumedFromPipeline = @@ -79,6 +82,22 @@ internal class SqsPoller(private val sqs: SqsClient, private val executor: SqsPr } } + val messageProduced: + (SqsDataForProduction, SqsMessageProduced) -> Unit = + { _, _ -> + Unit + } + val failedToProduceData = pipeline.failedHandler + + executor.produce( + it, + DataProductionConfiguration( + produceData, + messageProduced, + failedToProduceData + ) + ) + consumedFromPipeline } ?: consumer.onSuccess(message) diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/production/Events.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/production/Events.kt new file mode 100644 index 00000000..cc1f69eb --- /dev/null +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/production/Events.kt @@ -0,0 +1,5 @@ +package com.katanox.tabour.sqs.production + +import java.time.Instant + +data class SqsMessageProduced(val messageGroupId: String, val timestamp: Instant) diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/production/ProducerError.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/production/ProducerError.kt index 4685e056..d55aed56 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/production/ProducerError.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/production/ProducerError.kt @@ -1,3 +1,17 @@ package com.katanox.tabour.sqs.production -data class ProducerError(val message: String, val producerKey: T) +import java.net.URL +import software.amazon.awssdk.awscore.exception.AwsErrorDetails +import software.amazon.awssdk.core.exception.SdkClientException + +sealed interface ProductionError { + data class EmptyUrl(val url: URL) : ProductionError + + data class EmptyMessage(val message: T) : ProductionError + + data class UnrecognizedError(val error: Throwable) : ProductionError + + data class AwsError(val details: AwsErrorDetails) : ProductionError + + data class AwsSdkClientError(val exception: SdkClientException) : ProductionError +} diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducer.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducer.kt index 79cfe397..e59a9f0b 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducer.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducer.kt @@ -14,7 +14,7 @@ internal constructor( val queueUrl: URL ) : Config, TabourProducer { - override var onError: (ProducerError) -> Unit = {} + override var onError: (ProductionError) -> Unit = {} var config: SqsProducerConfiguration = sqsProducerConfiguration { retries = 1 } } diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerConfiguration.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerConfiguration.kt index 0789fb96..56e9fd6b 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerConfiguration.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerConfiguration.kt @@ -1,8 +1,12 @@ package com.katanox.tabour.sqs.production +import com.katanox.tabour.configuration.core.DataProductionConfiguration import com.katanox.tabour.consumption.Config class SqsProducerConfiguration internal constructor() : Config { /** How many times the producer will try to produce a message */ var retries: Int = 1 } + +typealias SqsDataProductionConfiguration = + DataProductionConfiguration diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerExecutor.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerExecutor.kt index c70419ca..31f90804 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerExecutor.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/production/SqsProducerExecutor.kt @@ -1,12 +1,15 @@ package com.katanox.tabour.sqs.production import com.katanox.tabour.retry +import java.time.Instant +import software.amazon.awssdk.awscore.exception.AwsServiceException +import software.amazon.awssdk.core.exception.SdkClientException import software.amazon.awssdk.services.sqs.SqsClient import software.amazon.awssdk.services.sqs.model.SendMessageRequest internal class SqsProducerExecutor(private val sqs: SqsClient) { - suspend fun produce(producer: SqsProducer, produceFn: () -> SqsDataForProduction) { - val produceData = produceFn() + suspend fun produce(producer: SqsProducer, f: SqsDataProductionConfiguration) { + val produceData = f.produceData() val url = producer.queueUrl.toString() @@ -28,16 +31,31 @@ internal class SqsProducerExecutor(private val sqs: SqsClient) { retry( producer.config.retries, { - producer.onError( - ProducerError( - producerKey = producer.key, - message = it.message - ?: "Unknown exception during production (producer key: [${producer.key}])" - ) - ) + when (it) { + is AwsServiceException -> + producer.onError( + ProductionError.AwsError(details = it.awsErrorDetails()) + ) + is SdkClientException -> + producer.onError(ProductionError.AwsSdkClientError(it)) + else -> producer.onError(ProductionError.UnrecognizedError(it)) + } } ) { - sqs.sendMessage(request) + val response = sqs.sendMessage(request) + + if (response.messageId().isNotEmpty()) { + f.dataProduced( + produceData, + SqsMessageProduced(response.messageId(), Instant.now()) + ) + } + } + } else { + when { + url.isEmpty() -> producer.onError(ProductionError.EmptyUrl(producer.queueUrl)) + produceData.message.isNullOrEmpty() -> + producer.onError(ProductionError.EmptyMessage(produceData)) } } } diff --git a/core/src/main/kotlin/com/katanox/tabour/sqs/production/TabourProducer.kt b/core/src/main/kotlin/com/katanox/tabour/sqs/production/TabourProducer.kt index ae8797f3..124faac4 100644 --- a/core/src/main/kotlin/com/katanox/tabour/sqs/production/TabourProducer.kt +++ b/core/src/main/kotlin/com/katanox/tabour/sqs/production/TabourProducer.kt @@ -2,5 +2,5 @@ package com.katanox.tabour.sqs.production interface TabourProducer { val key: K - var onError: (ProducerError) -> Unit + var onError: (ProductionError) -> Unit } diff --git a/settings.gradle.kts b/settings.gradle.kts index dc8f3790..8beaac06 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -8,7 +8,7 @@ rootProject.name = "tabour" include("core", "proto", "spring") -val kotlinVersion = "1.9.0" +val kotlinVersion = "1.9.20" val coroutinesVersion = "1.7.3" dependencyResolutionManagement { @@ -29,7 +29,7 @@ dependencyResolutionManagement { library("mockk", "io.mockk:mockk:1.13.5") library( "kotlin-test-coroutines", - "org.jetbrains.kotlinx:kotlinx-coroutines-test:1.6.4" + "org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutinesVersion" ) bundle(