Skip to content

Commit

Permalink
Merge pull request #200 from tKe/fix/event-loop-poll
Browse files Browse the repository at this point in the history
fix: event loop stops polling after empty record batch
  • Loading branch information
nomisRev authored Oct 10, 2024
2 parents 9d7604f + 2827eaa commit 2850735
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlin.time.toJavaDuration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.Dispatchers.Default
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.cancelAndJoin
Expand All @@ -44,6 +44,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

private val logger: Logger =
LoggerFactory.getLogger(EventLoop::class.java)
Expand Down Expand Up @@ -83,7 +84,7 @@ internal class EventLoop<K, V>(
channel.consumeAsFlow()
.onStart {
if (topicNames.isNotEmpty()) subscribe(topicNames)
withContext(scope.coroutineContext) { poll() }
schedulePoll()
commitManager.start()
}.onCompletion {
commitBatchSignal.close()
Expand Down Expand Up @@ -120,8 +121,20 @@ internal class EventLoop<K, V>(
return pausedNow
}

private val scheduled = AtomicBoolean(false)
private fun schedulePoll() {
if (scheduled.compareAndSet(false, true)) {
scope.launch {
scheduled.set(false)
@OptIn(DelicateCoroutinesApi::class)
if (!channel.isClosedForSend) poll()
}
}
}

@ConsumerThread
private fun poll() {
checkConsumerThread("poll")
try {
runCommitIfRequired(false)

Expand Down Expand Up @@ -164,32 +177,35 @@ internal class EventLoop<K, V>(
ConsumerRecords.empty()
}

if (!records.isEmpty) {
if (records.isEmpty) {
schedulePoll()
} else {
if (settings.maxDeferredCommits > 0) {
commitBatch.addUncommitted(records)
}
logger.debug("Attempting to send ${records.count()} records to Channel")
channel.trySend(records)
.onSuccess { poll() }
.onSuccess { schedulePoll() }
.onClosed { error -> logger.error("Channel closed when trying to send records.", error) }
.onFailure { error ->
if (error != null) {
logger.error("Channel send failed when trying to send records.", error)
closeChannel(error)
} else logger.debug("Back-pressuring kafka consumer. Might pause KafkaConsumer on next poll tick.")

isPolling.set(false)

scope.launch(outerContext) {
/* Send the records down,
* when send returns we attempt to send and empty set of records down to test the backpressure.
* If our "backpressure test" returns we start requesting/polling again. */
channel.send(records)
if (isPaused.get()) {
consumer.wakeup()
} else {
logger.debug("Back-pressuring kafka consumer. Might pause KafkaConsumer on next poll tick.")

isPolling.set(false)
scope.launch(outerContext) {
/* Send the records down,
* when send returns we attempt to send and empty set of records down to test the backpressure.
* If our "backpressure test" returns we start requesting/polling again. */
channel.send(records)
if (isPaused.get()) {
consumer.wakeup()
}
isPolling.set(true)
schedulePoll()
}
isPolling.set(true)
poll()
}
}
}
Expand Down Expand Up @@ -567,7 +583,7 @@ private annotation class ConsumerThread
private const val DEBUG: Boolean = true

private fun checkConsumerThread(msg: String): Unit =
if (DEBUG) require(
if (DEBUG) check(
Thread.currentThread().name.startsWith("kotlin-kafka-")
) { "$msg => should run on kotlin-kafka thread, but found ${Thread.currentThread().name}" }
else Unit
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import io.github.nomisrev.kafka.mapIndexed
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectIndexed
import kotlinx.coroutines.flow.count
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flowOf
Expand All @@ -19,7 +21,9 @@ import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.toSet
import kotlinx.coroutines.launch
import kotlinx.coroutines.yield
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Test
import kotlin.test.assertEquals
Expand Down Expand Up @@ -53,6 +57,75 @@ class KafakReceiverSpec : KafkaSpec() {
)
}

@Test
fun `All records produced while consuming are received`() = withTopic {
launch { publishToKafka(topic, produced) }
assertEquals(
KafkaReceiver()
.receive(topic.name())
.map { record ->
Pair(record.key(), record.value())
.also { record.offset.acknowledge() }
}.take(count)
.toSet(),
produced.toSet()
)
}

@Test
fun `Continuous receiving does not overflow stack`() = withTopic {
val largeCount = 20_000 // when it _was_ overflowing stack, it only took ~4095 messages.
publishScope {
repeat(largeCount) {
offer(ProducerRecord(topic.name(), "key-$it", "value-$it"))
}
}
val settings = receiverSetting().copy(
maxDeferredCommits = largeCount + 1,
commitStrategy = CommitStrategy.BySize(largeCount + 1),
properties = mapOf(ConsumerConfig.MAX_POLL_RECORDS_CONFIG to "1").toProperties()
)
assertEquals(
KafkaReceiver(settings)
.receive(topic.name())
.buffer(largeCount)
.take(largeCount)
.count(),
largeCount
)
}

@Test
fun `Can consume after backpressure`() = withTopic {
publishToKafka(topic, produced)
assertEquals(
KafkaReceiver()
.receive(topic.name())
.map { record ->
yield()
Pair(record.key(), record.value())
.also { record.offset.acknowledge() }
}.take(count)
.toSet(),
produced.toSet()
)
}

@Test
fun `Concurrent commits while receiving`() = withTopic {
publishToKafka(topic, produced)
val receiver = KafkaReceiver(
receiverSetting().copy(
commitStrategy = CommitStrategy.BySize(count / 2)
)
)
receiver.receive(topic.name())
.take(count)
.collect { it.offset.acknowledge() }

assertEquals(receiver.committedCount(topic.name()), count.toLong())
}

@Test
fun `All produced records with headers are received`() = withTopic(partitions = 1) {
val producerRecords = produced.map { (key, value) ->
Expand Down

0 comments on commit 2850735

Please sign in to comment.