diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 00000000..ad61a120 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,24 @@ +name: Maven build +on: [push] +jobs: + build: + runs-on: ubuntu-latest + steps: + - name: Check out repository code + uses: actions/checkout@v2 + + - name: Cache local Maven repository + uses: actions/cache@v2 + with: + path: ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Build + run: | + mvn dependency:go-offline + mvn -B clean compile + + - name: Run tests + run: mvn -B verify diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 00000000..49f246c2 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,20 @@ +name: Publish package to GitHub Packages +on: + release: + types: [created] +jobs: + publish: + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-java@v2 + with: + java-version: '11' + distribution: 'adopt' + - name: Publish package + run: mvn --batch-mode deploy + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index a1c2a238..f2f12322 100644 --- a/.gitignore +++ b/.gitignore @@ -1,23 +1,33 @@ -# Compiled class file -*.class +HELP.md +!**/src/main/** +!**/src/test/** +**/application.yml +**.DS_Store -# Log file -*.log +### MVN ### +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties -# BlueJ files -*.ctxt +# https://github.com/takari/maven-wrapper#usage-without-binary-jar +.mvn/wrapper/maven-wrapper.jar +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache -# Mobile Tools for Java (J2ME) -.mtj.tmp/ - -# Package Files # -*.jar -*.war -*.nar -*.ear -*.zip -*.tar.gz -*.rar - -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr \ No newline at end of file diff --git a/README.md b/README.md index 28b81000..1b2c6f97 100644 --- a/README.md +++ b/README.md @@ -1 +1,170 @@ -# event-stream-api \ No newline at end of file +# Tabour + +Kotlin's library to make working with queues/topics much easier. + +### Usage: + +## Installation + +First you need to get started is to add a dependency to `Tabour` library. +Then adding these to the main application of spring: +```kotlin + @ConfigurationPropertiesScan + @ComponentScan(basePackages = ["com.katanox.tabour"]) + class RandomApplication +``` + +#### Gradle/maven dependency + + + + + + +
ApproachInstruction
Maven +
<dependency>
+    <groupId>com.katanox</groupId>
+    <artifactId>tabour</artifactId>
+    <version>{version}</version>
+</dependency>
+
+ +## Supported Types +- SQS + +## Publisher example +```kotlin +class BookingEventPublisher: EventPublisher() { + + override fun getBusType(): BusType { + return BusType.SQS + } +} +``` +Then +```kotlin + bookingEventPublisher.publish(createBookingRequest.booking, "BUS_URL") +``` + +## Consumer example +In this example a protobuf message has been used. +```kotlin +class BookingEventConsumer : EventConsumer() { + + override fun consume(message: ByteArray) { + val input = ByteArrayInputStream(message) + val bookingBuilder = Booking.newBuilder() + TextFormat.merge(input.reader(), bookingBuilder) + val booking = bookingBuilder.build() + logger.info { booking } + } + + override fun getBusURL(): String { + return "BUS_URL" + } + + override fun getBusType(): BusType { + return BusType.SQS + + } +} +``` + +## Configurations + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
pathdefault valueexplanation
tabour.retry-max-count
1
Number of times that is gonna retry to publish or consume an event
tabour.handler.thread-pool-size
3
Size of the thread pool the the handler actors will use to consume the messages
tabour.handler.queue-size
10
Size of the internal queue that the threads will use to pull from once a message is consumed
tabour.handler.thread-name-prefix
""
Prefix of the threads that are created by the thread pool
tabour.poller.poll-delay
1 Second
Delay the poller should wait for the next poll after the previous poll has finished
tabour.poller.wait-time
20 Seconds
The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
tabour.poller.visibility-timeout
360 Seconds
Visibility timeout is the time-period or duration you specify for the queue item which when is fetched and processed by the consumer is made hidden from the queue and other consumers.
tabour.poller.batch-size
10
The maximum number of messages to pull from the even bus each poll: + + * event bus: + - SQS allows is maximum 10
tabour.poller.polling-threads
1
The number of threads that should poll for new messages. Each of those threads will poll a batch of batchSize messages and then wait for the pollDelay interval until polling the next batch.
tabour.sqs.access-key
NA
The AWS access key.
tabour.sqs.secret-key
NA
The AWS secret key.
tabour.sqs.region
NA
The AWS region
tabour.sqs.auto-startup
true
Configures if this listening container should be automatically started.
tabour.sqs.max-number-of-messages
10
Configure the maximum number of messages that should be retrieved during one poll to the Amazon SQS system. This number must be a positive, non-zero number that has a maximum number of 10. Values higher then 10 are currently not supported by the queueing system.
tabour.sqs.core-pool-size
1
Set the ThreadPoolExecutor's core pool size, that is being used by SQS
tabour.sqs.max-pool-size
Int.MAX_VALUE
Set the ThreadPoolExecutor's maximum pool size, that is being used by SQS
tabour.sqs.queue-capacity
Int.MAX_VALUE
Set the capacity for the ThreadPoolExecutor's BlockingQueue, that is being used by SQS Any positive value will lead to a LinkedBlockingQueue instance; Any other value will lead to a SynchronousQueue instance
tabour.sqs.enable-consumption
false
Configures if this the sqs listeners should be starting
\ No newline at end of file diff --git a/docs/maven.png b/docs/maven.png new file mode 100644 index 00000000..62be60ae Binary files /dev/null and b/docs/maven.png differ diff --git a/pom.xml b/pom.xml new file mode 100644 index 00000000..38c79fb8 --- /dev/null +++ b/pom.xml @@ -0,0 +1,447 @@ + + + 4.0.0 + com.katanox + tabour + 0.0.1 + Tabour + Kotlin's library to make working with queues/topics much easier. + + + jcenter + jcenter + https://jcenter.bintray.com + + + + + github + GitHub Packages + https://maven.pkg.github.com/katanox/tabour + + + + + The Apache License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + + + + + Ahmad Shabib + a.shabib@katanox.com + Katanox + http://www.katanox.com + + + + false + ${skipTests} + ${skipTests} + + + 11 + 1.4.31 + + + 3.0.2 + 3.0.0-M5 + 0.8.5 + 2.22.1 + + + + 2.8.6 + + 4.4.3 + + 1.15.2 + + 1.15.2 + + 2.0.6 + + 1.11.0 + 2.5.0 + 2.1.4.RELEASE + 1.3.2 + 1.6.1 + 1.5.10 + 1.5.0 + + + + + org.springframework.boot + spring-boot-autoconfigure + ${spring.boot.version} + + + org.springframework.boot + spring-boot-configuration-processor + ${spring.boot.version} + + + javax.annotation + javax.annotation-api + ${javax.annotation.version} + + + org.springframework.cloud + spring-cloud-aws-messaging + ${spring.cloud.version} + + + io.github.resilience4j + resilience4j-retry + ${resilience4j.retry.version} + + + org.jetbrains.kotlin + kotlin-reflect + ${jetbrains.kotlin.version} + + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + ${jetbrains.kotlin.version} + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + ${kotlin.coroutines.version} + + + com.google.code.gson + gson + ${gson.version} + + + io.github.microutils + kotlin-logging-jvm + ${kotlin-jvm-logging.version} + + + + + io.mockk + mockk + ${mockk.version} + test + + + io.kotest + kotest-assertions-core-jvm + ${kotest.version} + test + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + org.testcontainers + junit-jupiter + ${testcontainers.jupiter.version} + test + + + kotlin-test-junit + org.jetbrains.kotlin + ${kotlin.version} + test + + + + + ${project.basedir}/src/main/kotlin + ${project.basedir}/src/test/kotlin + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + + + + + org.jetbrains.kotlin + kotlin-maven-plugin + ${kotlin.version} + + + -Xjsr305=strict + + + spring + jpa + + 1.8 + + + + org.jetbrains.kotlin + kotlin-maven-allopen + ${kotlin.version} + + + org.jetbrains.kotlin + kotlin-maven-noarg + ${kotlin.version} + + + + + test-compile + + test-compile + + test-compile + + ${basedir}/target/test-classes + + src/test/kotlin + + + + + compile-integration-tests + + test-compile + + pre-integration-test + + ${basedir}/target/integration-test-classes + + + ${basedir}/src/integration-test/kotlin + + + + + + compile + compile + + compile + + + + src/main/kotlin + + + + + + + + maven-resources-plugin + ${resources.plugin.version} + + + default-testResources + + testResources + + process-test-resources + + ${basedir}/target/test-classes + + + ${basedir}/src/test/resources + + + + + + add-integration-test-resources + + copy-resources + + pre-integration-test + + ${basedir}/target/integration-test-classes + + + ${basedir}/src/integration-test/resources + + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${sunfire.plugin.version} + + ${basedir}/target/test-classes + ${basedir}/src/test/kotlin + ${surefireArgLine} + false + ${skipUTs} + + + + + org.jacoco + jacoco-maven-plugin + ${jacoco.plugin.version} + + + **/model/** + + ${basedir}/target/coverage-reports/jacoco.exec + ${basedir}/target/coverage-reports/jacoco.exec + ${basedir}/target/site/jacoco + + + + pre-unit-test + + prepare-agent + + + ${project.build.directory}/coverage-reports/jacoco-ut.exec + surefireArgLine + + + + post-unit-test + test + + report + + + ${project.build.directory}/coverage-reports/jacoco-ut.exec + ${project.reporting.outputDirectory}/jacoco-ut + + + + + pre-integration-test + pre-integration-test + + prepare-agent + + + ${project.build.directory}/coverage-reports/jacoco-it.exec + failsafeArgLine + + + + post-integration-test + post-integration-test + + report + + + ${project.build.directory}/coverage-reports/jacoco-it.exec + ${project.reporting.outputDirectory}/jacoco-it + + + + jacoco-site + package + + report + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + ${failsafe.plugin.version} + + true + ${failsafeArgLine} + ${skipTests} + ${skipITs} + + + + integration-test + integration-test + + integration-test + + + + **/** + + ${project.build.outputDirectory} + ${basedir}/target/integration-test-classes + ${basedir}/src/integration-test/kotlin + + + + verify + verify + + verify + + + false + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.2.0 + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + + compile + compile + + compile + + + + testCompile + test-compile + + testCompile + + + + + + + + + diff --git a/src/main/kotlin/com/katanox/tabour/base/IEventConsumerBase.kt b/src/main/kotlin/com/katanox/tabour/base/IEventConsumerBase.kt new file mode 100644 index 00000000..209cf223 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/base/IEventConsumerBase.kt @@ -0,0 +1,3 @@ +package com.katanox.tabour.base + +interface IEventConsumerBase \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/base/IEventPublisherBase.kt b/src/main/kotlin/com/katanox/tabour/base/IEventPublisherBase.kt new file mode 100644 index 00000000..4a51a4fc --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/base/IEventPublisherBase.kt @@ -0,0 +1,9 @@ +package com.katanox.tabour.base + +import com.katanox.tabour.factory.BusType +import java.io.Serializable + +interface IEventPublisherBase { + fun getType(): BusType + fun publish(message: T, busUrl: String) +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/config/EventHandlerProperties.kt b/src/main/kotlin/com/katanox/tabour/config/EventHandlerProperties.kt new file mode 100644 index 00000000..1403b54c --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/config/EventHandlerProperties.kt @@ -0,0 +1,33 @@ +package com.katanox.tabour.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.stereotype.Component + +@ConfigurationProperties(prefix = "tabour.handler") +@Component +data class EventHandlerProperties( + + /** + * + * Size of the thread pool the the handler actors will use to consume the messages + * + * The default is 3 threads. + */ + var threadPoolSize: Int = 3, + + /** + * + * Size of the internal queue that the threads will use to pull from once a message is consumed + * + * The default is 10. + */ + var queueSize: Int = 10, + + /** + * The prefix of the threads that are created by the thread pool + * + * + * The default is empty string. + */ + var threadNamePrefix: String = "" +) \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/config/EventPollerProperties.kt b/src/main/kotlin/com/katanox/tabour/config/EventPollerProperties.kt new file mode 100644 index 00000000..d3c8ee2b --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/config/EventPollerProperties.kt @@ -0,0 +1,61 @@ +package com.katanox.tabour.config + +import com.katanox.tabour.exception.ExceptionHandler +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.stereotype.Component +import java.time.Duration +import java.time.temporal.ChronoUnit + + +@ConfigurationProperties(prefix = "tabour.poller") +@Component +data class EventPollerProperties( + + /** + * The delay the poller should wait for the next poll after the previous poll has finished + * + * The default is 1 second. + */ + var pollDelay: Duration = Duration.of(1, ChronoUnit.SECONDS), + + /** + * The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. + * If a message is available, the call returns sooner than WaitTimeSeconds. + * If no messages are available and the wait time expires, the call returns successfully with an empty list of messages. + * + * The default is 20 second. + */ + var waitTime: Duration = Duration.ofSeconds(20), + + /** + * Visibility timeout is the time-period for which the queue item is hidden from other consumers after being fetched + * + * + * The default is 360 second. + */ + var visibilityTimeout: Duration = Duration.ofSeconds(360), + + /** + * The maximum number of messages to pull from the even bus each poll + * event bus: + * - SQS allows a maximum of 10 + * + * + * The default is 10. + */ + var batchSize: Int = 10, + + /** + * The number of threads that should poll for new messages. Each of those threads will poll a + * batch of batchSize messages and then wait for the pollDelay interval until polling the next + * batch. + * + * + * The default is 1. + */ + var pollingThreads: Int = 1, + + + ) { + var exceptionHandler: ExceptionHandler = ExceptionHandler.defaultExceptionHandler() +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/config/TabourAutoConfigs.kt b/src/main/kotlin/com/katanox/tabour/config/TabourAutoConfigs.kt new file mode 100644 index 00000000..f66f50b0 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/config/TabourAutoConfigs.kt @@ -0,0 +1,26 @@ +package com.katanox.tabour.config + + +import io.github.resilience4j.core.IntervalFunction +import io.github.resilience4j.retry.RetryConfig +import io.github.resilience4j.retry.RetryRegistry +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration + + +@Configuration(proxyBeanMethods = false) +class TabourAutoConfigs(@Autowired val tabourProperties: TabourProperties) { + + @Bean + fun retryRegistry(): RetryRegistry { + val retryConfig = RetryConfig.custom() + .maxAttempts(tabourProperties.retryMaxCount) + .intervalFunction(IntervalFunction.ofExponentialBackoff()) + .build() + return RetryRegistry.of(retryConfig) + } + +} + + diff --git a/src/main/kotlin/com/katanox/tabour/config/TabourProperties.kt b/src/main/kotlin/com/katanox/tabour/config/TabourProperties.kt new file mode 100644 index 00000000..3a2983e7 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/config/TabourProperties.kt @@ -0,0 +1,18 @@ +package com.katanox.tabour.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.stereotype.Component + + +@ConfigurationProperties(prefix = "tabour") +@Component +data class TabourProperties( + + /** + * + * Number of times that is gonna retry to publish or consume an event + * + * The default is 1. + */ + var retryMaxCount: Int = 1 +) \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/core/EventConsumer.kt b/src/main/kotlin/com/katanox/tabour/core/EventConsumer.kt new file mode 100644 index 00000000..0278368c --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/core/EventConsumer.kt @@ -0,0 +1,27 @@ +package com.katanox.tabour.core + +import com.katanox.tabour.base.IEventConsumerBase +import com.katanox.tabour.factory.EventConsumerFactory +import com.katanox.tabour.factory.BusType +import org.springframework.beans.factory.annotation.Autowired +import javax.annotation.PostConstruct + +abstract class EventConsumer { + + private lateinit var eventConsumer: IEventConsumerBase + + @Autowired + private lateinit var eventConsumerFactory: EventConsumerFactory + + abstract fun getBusURL(): String + + abstract fun getBusType(): BusType + + abstract fun consume(message: ByteArray) + + @PostConstruct + private fun setUp() { + eventConsumer = eventConsumerFactory.getEventConsumer(getBusType(),getBusURL()) { consume(it as ByteArray) } + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/core/EventPublisher.kt b/src/main/kotlin/com/katanox/tabour/core/EventPublisher.kt new file mode 100644 index 00000000..76720181 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/core/EventPublisher.kt @@ -0,0 +1,27 @@ +package com.katanox.tabour.core + +import com.katanox.tabour.base.IEventPublisherBase +import com.katanox.tabour.factory.EventPublisherFactory +import com.katanox.tabour.factory.BusType +import org.springframework.beans.factory.annotation.Autowired +import java.io.Serializable +import javax.annotation.PostConstruct + +abstract class EventPublisher { + + private lateinit var eventPublisher: IEventPublisherBase + + @Autowired + private lateinit var eventPublisherFactory: EventPublisherFactory + + abstract fun getBusType(): BusType + + @PostConstruct + private fun setUp() { + eventPublisher = eventPublisherFactory.getEventPublisher(getBusType()) + } + + open fun publish(message: T, busUrl: String) { + eventPublisher.publish(message, busUrl) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/exception/DefaultExceptionHandler.kt b/src/main/kotlin/com/katanox/tabour/exception/DefaultExceptionHandler.kt new file mode 100644 index 00000000..4f1ae3cb --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/exception/DefaultExceptionHandler.kt @@ -0,0 +1,18 @@ +package com.katanox.tabour.exception + +import com.amazonaws.services.sqs.model.Message +import mu.KotlinLogging +import org.springframework.stereotype.Component + +private val logger = KotlinLogging.logger {} + +@Component +class DefaultExceptionHandler : ExceptionHandler { + + override fun handleException(message: Message, e: Exception): ExceptionHandler.ExceptionHandlerDecision { + logger.warn( + "error while processing message ${message.messageId} - message has not been deleted from SQS and will be retried: ${e}", + ) + return ExceptionHandler.ExceptionHandlerDecision.RETRY + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/exception/ExceptionHandler.kt b/src/main/kotlin/com/katanox/tabour/exception/ExceptionHandler.kt new file mode 100644 index 00000000..4e4003ce --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/exception/ExceptionHandler.kt @@ -0,0 +1,28 @@ +package com.katanox.tabour.exception + +import com.amazonaws.services.sqs.model.Message + +interface ExceptionHandler { + enum class ExceptionHandlerDecision { + /** Delete the message from SQS. It will not be retried. */ + DELETE, + + /** + * Do not delete the message from SQS. In one of the next iterations, it will be polled by the + * poller again. + */ + RETRY + } + + /** + * Handles any exception that is thrown during message processing by an [SqsMessageHandler]. + */ + fun handleException(message: Message, e: Exception): ExceptionHandlerDecision + + companion object { + @JvmStatic + fun defaultExceptionHandler(): ExceptionHandler { + return DefaultExceptionHandler() + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/extentions/TypeAlias.kt b/src/main/kotlin/com/katanox/tabour/extentions/TypeAlias.kt new file mode 100644 index 00000000..08f472ed --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/extentions/TypeAlias.kt @@ -0,0 +1,3 @@ +package com.katanox.tabour.extentions + +typealias ConsumerAction = (ByteArray) -> Unit \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/factory/BusType.kt b/src/main/kotlin/com/katanox/tabour/factory/BusType.kt new file mode 100644 index 00000000..79044c64 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/factory/BusType.kt @@ -0,0 +1,5 @@ +package com.katanox.tabour.factory + +enum class BusType { + SQS +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/factory/EventConsumerFactory.kt b/src/main/kotlin/com/katanox/tabour/factory/EventConsumerFactory.kt new file mode 100644 index 00000000..72d2906a --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/factory/EventConsumerFactory.kt @@ -0,0 +1,40 @@ +package com.katanox.tabour.factory + +import com.katanox.tabour.base.IEventConsumerBase +import com.katanox.tabour.config.TabourAutoConfigs +import com.katanox.tabour.integration.sqs.core.consumer.SqsEventHandler +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Component +import java.util.* + +@Component +class EventConsumerFactory { + + private val eventConsumers = EnumMap>(BusType::class.java) + + init { + BusType.values().forEach { eventConsumers[it] = ArrayList() } + } + + @Autowired + private lateinit var tabourAutoConfigs: TabourAutoConfigs + + fun getEventConsumer(type: BusType, busName: String, consume: (Any) -> Unit): IEventConsumerBase { + return when (type) { + BusType.SQS -> { + val handler = SqsEventHandler(busName, consume, tabourAutoConfigs) + (eventConsumers[BusType.SQS] as ArrayList).add(handler) + handler + } + } + } + + fun getEventConsumers(type: BusType): List { + return when (type) { + BusType.SQS -> { + eventConsumers[type] ?: listOf() + } + } + + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/factory/EventPublisherFactory.kt b/src/main/kotlin/com/katanox/tabour/factory/EventPublisherFactory.kt new file mode 100644 index 00000000..b4993665 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/factory/EventPublisherFactory.kt @@ -0,0 +1,24 @@ +package com.katanox.tabour.factory + +import com.katanox.tabour.base.IEventPublisherBase +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Component +import java.util.* + +@Component +class EventPublisherFactory @Autowired constructor(services: List) { + + private val eventPublishers: MutableMap = + EnumMap(BusType::class.java) + + init { + for (service in services) { + eventPublishers[service.getType()] = service + } + } + + fun getEventPublisher(type: BusType): IEventPublisherBase { + return eventPublishers[type] + ?: throw RuntimeException("Unknown service type: $type") + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsAutoConfigurationLifecycle.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsAutoConfigurationLifecycle.kt new file mode 100644 index 00000000..15f63777 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsAutoConfigurationLifecycle.kt @@ -0,0 +1,25 @@ +package com.katanox.tabour.integration.sqs.config + +import com.katanox.tabour.integration.sqs.core.consumer.SqsEventHandlerRegistry +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.context.event.ApplicationReadyEvent +import org.springframework.context.ApplicationListener +import org.springframework.stereotype.Component +import javax.annotation.PreDestroy + + +class SqsAutoConfigurationLifecycle( + private val registry: SqsEventHandlerRegistry +) : ApplicationListener { + + + override fun onApplicationEvent(event: ApplicationReadyEvent) { + registry.start() + } + + @PreDestroy + fun destroy() { + registry.stop() + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsConfiguration.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsConfiguration.kt new file mode 100644 index 00000000..8304e035 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsConfiguration.kt @@ -0,0 +1,39 @@ +package com.katanox.tabour.integration.sqs.config + +import com.amazonaws.auth.AWSCredentialsProvider +import com.amazonaws.auth.AWSStaticCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.services.sqs.AmazonSQSAsync +import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs +import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Primary + +@Configuration(proxyBeanMethods = false) +@EnableSqs +class SqsConfiguration(@Autowired val sqsProperties: SqsProperties) { + + @Bean + @Primary + fun amazonSQSAsync(): AmazonSQSAsync { + return AmazonSQSAsyncClientBuilder.standard() + .withCredentials(credentialsProvider()) + .withRegion(sqsProperties.region) + .build() + } + + @Bean + fun credentialsProvider(): AWSCredentialsProvider { + return AWSStaticCredentialsProvider(BasicAWSCredentials(sqsProperties.accessKey, sqsProperties.secretKey)) + } + + @Bean + fun queueMessagingTemplate(): QueueMessagingTemplate { + return QueueMessagingTemplate(amazonSQSAsync()) + } + + +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsConsumptionConfiguration.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsConsumptionConfiguration.kt new file mode 100644 index 00000000..f1b4921e --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsConsumptionConfiguration.kt @@ -0,0 +1,64 @@ +package com.katanox.tabour.integration.sqs.config + +import com.amazonaws.services.sqs.AmazonSQSAsync +import com.katanox.tabour.config.EventHandlerProperties +import com.katanox.tabour.config.EventPollerProperties +import com.katanox.tabour.factory.BusType +import com.katanox.tabour.factory.EventConsumerFactory +import com.katanox.tabour.integration.sqs.core.consumer.SqsEventHandler +import com.katanox.tabour.integration.sqs.core.consumer.SqsEventHandlerRegistry +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.core.task.AsyncTaskExecutor +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor + +@Configuration(proxyBeanMethods = false) +@ConditionalOnProperty(prefix = "tabour.sqs", name = ["enable-consumption"], havingValue = "true") +class SqsConsumptionConfiguration(@Autowired val sqsProperties: SqsProperties) { + + @Bean + fun simpleMessageListenerContainerFactory( + amazonSQSAsync: AmazonSQSAsync + ): SimpleMessageListenerContainerFactory? { + val factory = SimpleMessageListenerContainerFactory() + factory.setAmazonSqs(amazonSQSAsync) + factory.setAutoStartup(sqsProperties.autoStartup) + factory.setMaxNumberOfMessages(sqsProperties.maxNumberOfMessages) + factory.setTaskExecutor(createDefaultTaskExecutor()) + return factory + } + + protected fun createDefaultTaskExecutor(): AsyncTaskExecutor { + val threadPoolTaskExecutor = ThreadPoolTaskExecutor() + threadPoolTaskExecutor.setThreadNamePrefix("SQS-") + threadPoolTaskExecutor.corePoolSize = sqsProperties.corePoolSize + threadPoolTaskExecutor.maxPoolSize = sqsProperties.maxPoolSize + threadPoolTaskExecutor.setQueueCapacity(sqsProperties.queueCapacity) + threadPoolTaskExecutor.afterPropertiesSet() + return threadPoolTaskExecutor + } + + + @Bean + fun sqsMessageHandlerRegistry( + eventConsumerFactory: EventConsumerFactory, + eventHandlerProperties: EventHandlerProperties, + eventPollerProperties: EventPollerProperties, + sqsConfiguration: SqsConfiguration + ): SqsEventHandlerRegistry { + return SqsEventHandlerRegistry( + eventConsumerFactory.getEventConsumers(BusType.SQS) as List, + eventHandlerProperties, + eventPollerProperties, + sqsConfiguration + ) + } + + @Bean + fun sqsLifecycle(registry: SqsEventHandlerRegistry): SqsAutoConfigurationLifecycle { + return SqsAutoConfigurationLifecycle(registry) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsProperties.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsProperties.kt new file mode 100644 index 00000000..fb3fbc5f --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/config/SqsProperties.kt @@ -0,0 +1,83 @@ +package com.katanox.tabour.integration.sqs.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.stereotype.Component + +@ConfigurationProperties(prefix = "tabour.sqs") +@Component +data class SqsProperties( + /** + * + * The AWS access key. + * + */ + var accessKey: String = "", + + /** + * + * The AWS secret access key. + * + */ + var secretKey: String = "", + + /** + * + * Sets the region to be used by the client. This will be used to determine both the service endpoint (eg: https://sns.us-west-1.amazonaws.com) + * and signing region (eg: us-west-1) for requests. + * + */ + var region: String = "", + + /** + * + * Configures if this listening container should be automatically started. + * + * The default is true + */ + var autoStartup: Boolean = true, + + /** + * Configure the maximum number of messages that should be retrieved during one poll to the Amazon SQS system. + * This number must be a positive, non-zero number that has a maximum number of 10. + * Values higher then 10 are currently not supported by the queueing system. + * the maximum number of messages (between 1-10) + * + * + * The default is 10 + * + */ + var maxNumberOfMessages: Int = 10, + + /** + * Set the ThreadPoolExecutor's core pool size, that is being used by SQS + * + * + * Default is 1. + */ + var corePoolSize: Int = 1, + + /** + * Set the ThreadPoolExecutor's maximum pool size, that is being used by SQS + * + * + * Default is Integer.MAX_VALUE. + */ + var maxPoolSize: Int = Int.MAX_VALUE, + + /** + * Set the capacity for the ThreadPoolExecutor's BlockingQueue, that is being used by SQS + * Any positive value will lead to a LinkedBlockingQueue instance; + * Any other value will lead to a SynchronousQueue instance + * + * + * Default is Integer.MAX_VALUE. + */ + var queueCapacity: Int = 100, + + /** + * Configures if this the sqs listeners should be starting + * + * The default is false + */ + var enableConsumption: Boolean = false, +) \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventFetcher.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventFetcher.kt new file mode 100644 index 00000000..770cff95 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventFetcher.kt @@ -0,0 +1,43 @@ +package com.katanox.tabour.integration.sqs.core.consumer + +import com.amazonaws.services.sqs.model.Message +import com.amazonaws.services.sqs.model.ReceiveMessageRequest +import com.katanox.tabour.config.EventPollerProperties +import com.katanox.tabour.integration.sqs.config.SqsConfiguration +import mu.KotlinLogging +import org.springframework.beans.factory.annotation.Autowired + +private val logger = KotlinLogging.logger {} + +class SqsEventFetcher( + private val queueUrl: String, + private val sqsConfiguration: SqsConfiguration, + private val properties: EventPollerProperties, +) { + + fun fetchMessages(): List { + val request = ReceiveMessageRequest() + .withMaxNumberOfMessages(properties.batchSize) + .withQueueUrl(queueUrl) + .withWaitTimeSeconds(properties.waitTime.seconds.toInt()) + .withVisibilityTimeout(properties.visibilityTimeout.seconds.toInt()) + val result = sqsConfiguration.amazonSQSAsync().receiveMessage(request) + if (result.sdkHttpMetadata == null) { + logger.error( + "cannot determine success from response for SQS queue {}: {}", + queueUrl, + result.sdkResponseMetadata + ) + return emptyList() + } + if (result.sdkHttpMetadata.httpStatusCode != 200) { + logger.error( + "got error response from SQS queue {}: {}", + queueUrl, + result.sdkHttpMetadata + ) + return emptyList() + } + return result.messages + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventHandler.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventHandler.kt new file mode 100644 index 00000000..f752c8c7 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventHandler.kt @@ -0,0 +1,59 @@ +package com.katanox.tabour.integration.sqs.core.consumer + +import com.katanox.tabour.base.IEventConsumerBase +import com.katanox.tabour.config.TabourAutoConfigs +import com.katanox.tabour.extentions.ConsumerAction +import io.github.resilience4j.retry.event.RetryOnErrorEvent +import mu.KotlinLogging +import org.springframework.context.annotation.Scope +import org.springframework.context.annotation.ScopedProxyMode +import org.springframework.stereotype.Component + +private val logger = KotlinLogging.logger {} + +@Component +@Scope(proxyMode = ScopedProxyMode.TARGET_CLASS, scopeName = "prototype") +class SqsEventHandler( + val sqsQueueUrl: String = "", + val consumerAction: ConsumerAction = {}, + val tabourConfigs: TabourAutoConfigs +) : IEventConsumerBase { + /** + * Called just before the handle() method is being called. You can implement this method to + * initialize the thread handling the message with [ThreadLocal]s or add an MDC context for + * logging or something similar. Just make sure that you clean up after yourself in the + * onAfterHandle() method. + * + * + * The default implementation does nothing. + */ + fun onBeforeHandle(message: ByteArray) { + } + + /** + * Called after a message has been handled, irrespective of the success. In case of an exception + * during the invocation of handle(), onAfterHandle() is called AFTER the exception has been + * handled by an [ExceptionHandler] so that the exception handler still has any context that + * might have been set in onBeforeHandle(). + * + * + * The default implementation does nothing. + */ + fun onAfterHandle(message: ByteArray) {} + + fun handle(message: ByteArray) { + val retry = tabourConfigs.retryRegistry().retry("handler") + retry + .eventPublisher + .onError { event: RetryOnErrorEvent? -> + logger.warn( + "error {} handling message {}", + event, + message + ) + } + retry.executeRunnable { consumerAction(message) } + } + + +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventHandlerRegistry.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventHandlerRegistry.kt new file mode 100644 index 00000000..624d7346 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventHandlerRegistry.kt @@ -0,0 +1,88 @@ +package com.katanox.tabour.integration.sqs.core.consumer + +import com.katanox.tabour.config.EventHandlerProperties +import com.katanox.tabour.config.EventPollerProperties +import com.katanox.tabour.exception.ExceptionHandler +import com.katanox.tabour.integration.sqs.config.SqsConfiguration +import com.katanox.tabour.thread.ThreadPools +import mu.KotlinLogging +import java.util.concurrent.ScheduledThreadPoolExecutor +import java.util.concurrent.ThreadPoolExecutor + +private val logger = KotlinLogging.logger {} + + +class SqsEventHandlerRegistry( + eventHandlers: List, + var eventHandlerProperties: EventHandlerProperties, + var eventPollerProperties: EventPollerProperties, + var sqsConfiguration: SqsConfiguration +) { + private var pollers: Set = setOf() + + init { + pollers = initializePollers(eventHandlers) + } + + private fun initializePollers( + registrations: List + ): Set { + val pollers: MutableSet = HashSet() + for (registration in registrations) { + pollers.add(createPollerForHandler(registration)) + logger.info("initialized SqsMessagePoller '{}'", registration.javaClass::getCanonicalName.name) + } + return pollers + } + + private fun createPollerForHandler( + registration: SqsEventHandler + ): SqsEventPoller { + return SqsEventPoller( + name = registration.javaClass::getCanonicalName.name, + eventHandler = registration, + eventFetcher = createFetcherForHandler(registration), + pollerThreadPool = createPollingThreadPool(registration), + handlerThreadPool = createHandlerThreadPool(registration), + pollingProperties = eventPollerProperties, + sqsConfiguration = sqsConfiguration, + exceptionHandler = ExceptionHandler.defaultExceptionHandler() + ) + } + + private fun createFetcherForHandler(registration: SqsEventHandler): SqsEventFetcher { + return SqsEventFetcher(registration.sqsQueueUrl, sqsConfiguration, eventPollerProperties) + } + + private fun createPollingThreadPool( + registration: SqsEventHandler + ): ScheduledThreadPoolExecutor { + return ThreadPools.blockingScheduledThreadPool( + EventPollerProperties().pollingThreads, + String.format("%s-poller", registration.javaClass::getCanonicalName.name) + ) + } + + private fun createHandlerThreadPool( + registration: SqsEventHandler + ): ThreadPoolExecutor { + return ThreadPools.blockingThreadPool( + eventHandlerProperties.threadPoolSize, + eventHandlerProperties.queueSize, + String.format("%s-handler", registration.javaClass::getCanonicalName.name) + ) + } + + fun start() { + for (poller in pollers!!) { + poller.start() + } + } + + fun stop() { + for (poller in pollers!!) { + poller.stop() + } + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventPoller.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventPoller.kt new file mode 100644 index 00000000..25e7ea1e --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/consumer/SqsEventPoller.kt @@ -0,0 +1,87 @@ +package com.katanox.tabour.integration.sqs.core.consumer + +import com.amazonaws.services.sqs.model.Message +import com.katanox.tabour.config.EventPollerProperties +import com.katanox.tabour.exception.ExceptionHandler +import com.katanox.tabour.integration.sqs.config.SqsConfiguration +import mu.KotlinLogging +import org.springframework.beans.factory.annotation.Autowired +import java.util.concurrent.ScheduledThreadPoolExecutor +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit + +/** + * Polls messages from an SQS queue in potentially multiple threads at regular intervals. + * + * @param the type of message. + */ +private val logger = KotlinLogging.logger {} + +class SqsEventPoller( + private val name: String, + private val eventHandler: SqsEventHandler, + private val eventFetcher: SqsEventFetcher, + private val pollerThreadPool: ScheduledThreadPoolExecutor, + private val handlerThreadPool: ThreadPoolExecutor, + private val pollingProperties: EventPollerProperties, + private val sqsConfiguration: SqsConfiguration, + private val exceptionHandler: ExceptionHandler +) { + fun start() { + logger.info("starting SqsMessagePoller") + for (i in 0 until pollerThreadPool.corePoolSize) { + logger.info("starting SqsMessagePoller ({}) - thread {}", name, i) + pollerThreadPool.scheduleWithFixedDelay( + { pollMessages() }, + pollingProperties.pollDelay.seconds, + pollingProperties.pollDelay.seconds, + TimeUnit.SECONDS + ) + } + } + + fun stop() { + logger.info("stopping SqsMessagePoller") + pollerThreadPool.shutdownNow() + handlerThreadPool.shutdownNow() + } + + private fun pollMessages() { + try { + val messages: List = eventFetcher.fetchMessages() + for (sqsMessage in messages) { + handleMessage(sqsMessage) + } + } catch (e: Exception) { + logger.error("error fetching messages from queue {}:", eventHandler.sqsQueueUrl, e) + } + } + + private fun handleMessage(sqsMessage: Message) { + logger.info("Received message ID {}", sqsMessage.messageId) + val message = sqsMessage.body + handlerThreadPool.submit { + try { + eventHandler.onBeforeHandle(message.toByteArray()) + eventHandler.handle(message.toByteArray()) + acknowledgeMessage(sqsMessage) + logger.debug( + "message {} processed successfully - message has been deleted from SQS", + sqsMessage.messageId + ) + } catch (e: Exception) { + when (exceptionHandler.handleException(sqsMessage, e)) { + ExceptionHandler.ExceptionHandlerDecision.RETRY -> { + } + ExceptionHandler.ExceptionHandlerDecision.DELETE -> acknowledgeMessage(sqsMessage) + } + } finally { + eventHandler.onAfterHandle(message.toByteArray()) + } + } + } + + private fun acknowledgeMessage(message: Message) { + sqsConfiguration.amazonSQSAsync().deleteMessage(eventHandler.sqsQueueUrl, message.receiptHandle) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/integration/sqs/core/publisher/SqsEventPublisher.kt b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/publisher/SqsEventPublisher.kt new file mode 100644 index 00000000..d6f2263c --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/integration/sqs/core/publisher/SqsEventPublisher.kt @@ -0,0 +1,72 @@ +package com.katanox.tabour.integration.sqs.core.publisher + +import com.amazonaws.services.sqs.model.SendMessageRequest +import com.katanox.tabour.base.IEventPublisherBase +import com.katanox.tabour.config.TabourAutoConfigs +import com.katanox.tabour.factory.BusType +import com.katanox.tabour.integration.sqs.config.SqsConfiguration +import mu.KotlinLogging +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Component +import java.io.Serializable + +private val logger = KotlinLogging.logger {} + +@Component +class SqsEventPublisher : IEventPublisherBase { + + + @Autowired + private lateinit var sqsConfiguration: SqsConfiguration + + @Autowired + private lateinit var tabourConfigs: TabourAutoConfigs + + override fun getType(): BusType { + return BusType.SQS + } + + override fun publish(message: T, busUrl: String) { + publish(message, busUrl, SendMessageRequest()) + } + + /** + * Publishes a message with a pre-configured [SendMessageRequest] which gives you all the + * options you may need from the underlying SQS client. Note that the `queueUrl` and `messageBody` + * must not be set because they will be set by the this publisher. + */ + private fun publish(message: T, busUrl: String, preConfiguredRequest: SendMessageRequest) { + require(preConfiguredRequest.queueUrl == null) { "attribute queueUrl of pre-configured request must not be set!" } + require(preConfiguredRequest.messageBody == null) { "message body of pre-configured request must not be set!" } + val retry = tabourConfigs.retryRegistry().retry("publish") + retry + .eventPublisher + .onError { + logger.warn( + "error publishing message to queue {}", + busUrl + ) + } + retry.executeRunnable { doPublish(message, busUrl, preConfiguredRequest) } + } + + private fun doPublish(message: T, busUrl: String, preConfiguredRequest: SendMessageRequest) { + logger.debug( + "sending message {} to SQS queue {}", message.toString(), busUrl + ) + val request = preConfiguredRequest + .withQueueUrl(busUrl) + .withMessageBody(message.toString()) + val result = sqsConfiguration.amazonSQSAsync().sendMessage(request) + if (result.sdkHttpMetadata.httpStatusCode != 200) { + throw RuntimeException( + String.format( + "got error response from SQS queue %s: %s", + busUrl, result.sdkHttpMetadata + ) + ) + } + logger.info("Sent message ID {}", result.messageId) + } + +} diff --git a/src/main/kotlin/com/katanox/tabour/thread/NamedThreadFactory.kt b/src/main/kotlin/com/katanox/tabour/thread/NamedThreadFactory.kt new file mode 100644 index 00000000..10d5e399 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/thread/NamedThreadFactory.kt @@ -0,0 +1,17 @@ +package com.katanox.tabour.thread + +import java.util.concurrent.ThreadFactory +import java.util.concurrent.atomic.AtomicInteger + +class NamedThreadFactory( + private val currentThreadCount: AtomicInteger = AtomicInteger(0), + private val threadNamePrefix: String +) : ThreadFactory { + + + override fun newThread(runnable: Runnable?): Thread { + val threadNumber = currentThreadCount.incrementAndGet() + val threadName = String.format("%s-%d", threadNamePrefix, threadNumber) + return Thread(runnable, threadName) + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/katanox/tabour/thread/ThreadPools.kt b/src/main/kotlin/com/katanox/tabour/thread/ThreadPools.kt new file mode 100644 index 00000000..f10b7383 --- /dev/null +++ b/src/main/kotlin/com/katanox/tabour/thread/ThreadPools.kt @@ -0,0 +1,42 @@ +package com.katanox.tabour.thread + +import java.util.concurrent.* + +object ThreadPools { + + fun blockingThreadPool(threads: Int, queueSize: Int, poolName: String): ThreadPoolExecutor { + return ThreadPoolExecutor( + threads, + threads, + 0L, + TimeUnit.SECONDS, + ArrayBlockingQueue(queueSize), + NamedThreadFactory(threadNamePrefix = poolName), + retryPolicy() + ) + } + + fun blockingScheduledThreadPool( + threads: Int, poolName: String + ): ScheduledThreadPoolExecutor { + return ScheduledThreadPoolExecutor( + threads, NamedThreadFactory(threadNamePrefix = poolName), retryPolicy() + ) + } + + /** + * Re-Queues a rejected [Runnable] into the thread pool's blocking queue, making the + * submitting thread wait until the threadpool has capacity again. + */ + private fun retryPolicy(): RejectedExecutionHandler { + return RejectedExecutionHandler { r: Runnable, executor: ThreadPoolExecutor -> + try { + executor.queue.put(r) + } catch (e: InterruptedException) { + throw RuntimeException(e) + } + } + } + + +} \ No newline at end of file