Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2eTest for continuous receive #193

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 51 additions & 9 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
import com.bnorm.power.PowerAssertGradleExtension
import kotlinx.knit.KnitPluginExtension
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.*
import org.gradle.api.tasks.testing.logging.TestLogEvent
import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED
import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED
import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_ERROR
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_OUT
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
import org.gradle.api.tasks.testing.logging.TestLogEvent.*
import org.jetbrains.dokka.gradle.DokkaTask

plugins {
Expand All @@ -17,10 +11,12 @@ plugins {
alias(libs.plugins.knit)
alias(libs.plugins.publish)
alias(libs.plugins.power.assert)
idea
}

repositories {
mavenCentral()
maven("https://oss.sonatype.org/content/repositories/snapshots")
}

group = "io.github.nomisrev"
Expand All @@ -34,8 +30,15 @@ dependencies {

testImplementation(kotlin("test"))
testImplementation(libs.testcontainers.kafka)
testImplementation(libs.slf4j.simple)
testImplementation(libs.kotlinx.coroutines.test)
testImplementation(libs.jackson.kotlin)
testImplementation(libs.jackson.databind)
testImplementation(libs.kotest.framework.api)
testImplementation(libs.kotest.runner.junit5)
testImplementation(libs.kotest.property)
testImplementation(libs.stove.testing)
testImplementation(libs.stove.testing.kafka)
testImplementation(libs.logback.classic)
}

configure<PowerAssertGradleExtension> {
Expand All @@ -52,10 +55,34 @@ configure<JavaPluginExtension> {
}
}

sourceSets {
@Suppress("LocalVariableName", "ktlint:standard:property-naming")
val `test-e2e` by creating {
compileClasspath += sourceSets.main.get().output
runtimeClasspath += sourceSets.main.get().output
}

val testE2eImplementation by configurations.getting {
extendsFrom(configurations.testImplementation.get())
}
configurations["testE2eRuntimeOnly"].extendsFrom(configurations.runtimeOnly.get())
}

idea {
module {
testSources.from(sourceSets["test-e2e"].allSource.sourceDirectories)
testResources.from(sourceSets["test-e2e"].resources.sourceDirectories)
isDownloadJavadoc = true
isDownloadSources = true
}
}

kotlin {
explicitApi()
jvmToolchain(17)
}


tasks {
withType<DokkaTask>().configureEach {
outputDirectory.set(rootDir.resolve("docs"))
Expand Down Expand Up @@ -88,5 +115,20 @@ tasks {
exceptionFormat = FULL
events = setOf(SKIPPED, FAILED, STANDARD_ERROR)
}
jvmArgs("--add-opens", "java.base/java.util=ALL-UNNAMED")
}

task<Test>("e2eTest") {
description = "Runs e2e tests."
group = "verification"
testClassesDirs = sourceSets["test-e2e"].output.classesDirs
classpath = sourceSets["test-e2e"].runtimeClasspath

useJUnitPlatform()
reports {
junitXml.required.set(true)
html.required.set(true)
}
jvmArgs("--add-opens", "java.base/java.util=ALL-UNNAMED")
}
}
14 changes: 14 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ slf4j = "2.0.12"
spotless="6.25.0"
publish="0.28.0"
power-assert="0.13.0"
stove = "1.0.0-SNAPSHOT"
jackson = "2.17.1"

[libraries]
kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" }
Expand All @@ -26,6 +28,18 @@ testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "tes
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }

jackson-kotlin = { module = "com.fasterxml.jackson.module:jackson-module-kotlin", version.ref = "jackson" }
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
logback-classic = { module = "ch.qos.logback:logback-classic", version = "1.5.6" }

# Testing
stove-testing = { module = "com.trendyol:stove-testing-e2e", version.ref = "stove" }
stove-ktor-testing = { module = "com.trendyol:stove-ktor-testing-e2e", version.ref = "stove" }
stove-testing-kafka = { module = "com.trendyol:stove-testing-e2e-kafka", version.ref = "stove" }
kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" }
kotest-framework-api = { module = "io.kotest:kotest-framework-api", version.ref = "kotest" }


[plugins]
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" }
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ rootProject.name = "kotlin-kafka"
dependencyResolutionManagement {
repositories {
mavenCentral()
maven("https://oss.sonatype.org/content/repositories/snapshots")
}
}


include(":guide")
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.github.nomisRev.kafka.e2e.setup

import com.trendyol.stove.testing.e2e.system.abstractions.ApplicationUnderTest
import io.github.nomisRev.kafka.e2e.setup.example.KafkaTestShared
import io.github.nomisRev.kafka.e2e.setup.example.ReceiveMethod
import io.github.nomisRev.kafka.e2e.setup.example.StoveKafkaValueDeserializer
import io.github.nomisRev.kafka.e2e.setup.example.StoveKafkaValueSerializer
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import io.github.nomisRev.kafka.publisher.PublisherSettings
import io.github.nomisRev.kafka.receiver.CommitStrategy
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.*
import kotlin.time.Duration.Companion.seconds

/**
* Stove's Kafka application under test implementation
*/
class KafkaApplicationUnderTest : ApplicationUnderTest<Unit> {
private lateinit var client: AdminClient
private val consumers: MutableList<AutoCloseable> = mutableListOf()

override suspend fun start(configurations: List<String>) {
val bootstrapServers = configurations.first { it.contains("kafka.servers", true) }.split('=')[1]
val interceptorClass = configurations.first { it.contains("kafka.interceptor-classes", true) }.split('=')[1]
val receiveMethod = configurations.first { it.contains("kafka.receive-method", true) }.split('=')[1]
client = createAdminClient(bootstrapServers)
createTopics(client)
startConsumers(bootstrapServers, interceptorClass, ReceiveMethod.from(receiveMethod))
}

private fun createAdminClient(bootstrapServers: String): AdminClient {
return mapOf<String, Any>(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers
).let { AdminClient.create(it) }
}

private fun createTopics(client: AdminClient) {
val newTopics = KafkaTestShared.topics.flatMap {
listOf(it.topic, it.retryTopic, it.deadLetterTopic)
}.map { NewTopic(it, 1, 1) }
client.createTopics(newTopics).all().get()
}

private fun startConsumers(bootStrapServers: String, interceptorClass: String, receiveMethod: ReceiveMethod) {
val (publisher, receiver) = createPublisherAndReceiver(interceptorClass, bootStrapServers)
val configuredConsumers = KafkaTestShared.consumers(receiver, publisher, receiveMethod)
configuredConsumers.forEach { it.start() }
consumers.addAll(configuredConsumers)
}

private fun createPublisherAndReceiver(
interceptorClass: String, bootStrapServers: String
): Pair<KafkaPublisher<String, Any>, KafkaReceiver<String, Any>> {
val consumerSettings = mapOf(
ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG to "2000",
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG to "true", // Expected to be created by the client
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(interceptorClass)
)

val receiverSettings = ReceiverSettings(bootstrapServers = bootStrapServers,
valueDeserializer = StoveKafkaValueDeserializer(),
keyDeserializer = StringDeserializer(),
groupId = "stove-application-consumers",
commitStrategy = CommitStrategy.ByTime(2.seconds),
pollTimeout = 1.seconds,
properties = Properties().apply {
putAll(consumerSettings)
})

val producerSettings = PublisherSettings<String, Any>(bootStrapServers,
StringSerializer(),
StoveKafkaValueSerializer(),
properties = Properties().apply {
put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, listOf(interceptorClass)
)
})

val publisher = KafkaPublisher(producerSettings)
val receiver = KafkaReceiver(receiverSettings)
return Pair(publisher, receiver)
}

override suspend fun stop() {
client.close()
consumers.forEach { it.close() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.github.nomisRev.kafka.e2e.setup

import com.trendyol.stove.testing.e2e.standalone.kafka.KafkaSystemOptions
import com.trendyol.stove.testing.e2e.standalone.kafka.kafka
import com.trendyol.stove.testing.e2e.system.TestSystem
import io.kotest.core.config.AbstractProjectConfig

class ProjectConfig : AbstractProjectConfig() {
override suspend fun beforeProject(): Unit = TestSystem()
.with {
kafka {
KafkaSystemOptions(
configureExposedConfiguration = { cfg ->
listOf(
"kafka.servers=${cfg.bootstrapServers}",
"kafka.interceptor-classes=${cfg.interceptorClass}",
"kafka.receive-method=kotlin-kafka" // here we can change to: 'kotlin-kafka' or 'traditional'
)
}
)
}
applicationUnderTest(KafkaApplicationUnderTest())
}.run()

override suspend fun afterProject(): Unit = TestSystem.stop()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.github.nomisRev.kafka.e2e.setup.example

import io.github.nomisRev.kafka.e2e.setup.example.KafkaTestShared.TopicDefinition
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.flattenConcat
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.ProducerRecord
import java.time.Duration

/**
* Supervisor that uses KafkaReceiver to retrieve messages from Kafka and handle them accordingly
*/
abstract class ConsumerSupervisor<K, V>(
private val receiver: KafkaReceiver<K, V>,
private val publisher: KafkaPublisher<K, V>,
private val receiveMethod: ReceiveMethod
) : AutoCloseable {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val logger = org.slf4j.LoggerFactory.getLogger(javaClass)

abstract val topicDefinition: TopicDefinition

/**
* Here we start the consumer
* We can use either KotlinKafka receiver or traditional while(true) loop to receive messages
* Traditional while(true) loop is successful in receiving messages continuously
* KotlinKafka receiver, continuously receives messages
*/
fun start() = when (receiveMethod) {
ReceiveMethod.KOTLIN_KAFKA_RECEIVE -> kotlinKafkaReceive()
ReceiveMethod.TRADITIONAL_RECEIVE -> traditionalReceive()
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun kotlinKafkaReceive() {
scope.launch {
receiver.receiveAutoAck(
listOf(
topicDefinition.topic,
topicDefinition.retryTopic,
topicDefinition.deadLetterTopic
)
).flattenConcat()
.collect { message ->
logger.info("Message RECEIVED on the application side with KotlinKafka receiver: ${message.value()}")
received(message) // expected to receive the messages continuously?
}
}
}

private fun traditionalReceive() {
scope.launch {
receiver.withConsumer { consumer ->
consumer.subscribe(
listOf(
topicDefinition.topic,
topicDefinition.retryTopic,
topicDefinition.deadLetterTopic
)
)
while (isActive) {
val records = consumer.poll(Duration.ofMillis(500))
records.forEach { record ->
logger.info("Message RECEIVED on the application side with traditional while(true) loop: ${record.value()}")
received(record) {
consumer.commitAsync()
}
}
}
}
}
}


abstract suspend fun consume(record: ConsumerRecord<K, V>)

protected open suspend fun handleError(message: ConsumerRecord<K, V>, e: Exception) {
logger.error("Failed to process message: $message", e)
}

private suspend fun received(message: ConsumerRecord<K, V>, onSuccess: (ConsumerRecord<K, V>) -> Unit = { }) {
try {
consume(message)
onSuccess(message)
logger.info("Message COMMITTED on the application side: ${message.value()}")
} catch (e: Exception) {
handleError(message, e)
logger.warn("CONSUMER GOT an ERROR on the application side, exception: $e")
val record = ProducerRecord<K, V>(
topicDefinition.deadLetterTopic,
message.partition(),
message.key(),
message.value(),
message.headers()
)
try {
publisher.publishScope { offer(record) }
} catch (e: Exception) {
logger.error("Failed to publish message to dead letter topic: $message", e)
}
}
}

override fun close(): Unit = runBlocking {
try {
scope.cancel()
} catch (e: Exception) {
logger.error("Failed to stop consuming", e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.github.nomisRev.kafka.e2e.setup.example

object DomainEvents {
data class ProductCreated(val productId: String)
}
Loading
Loading