Skip to content

Commit

Permalink
Distribute records from the default executor
Browse files Browse the repository at this point in the history
This is hacked together POC. Do not merge!
  • Loading branch information
erikvanoosten committed Dec 4, 2024
1 parent e689977 commit a2efbe0
Showing 1 changed file with 64 additions and 44 deletions.
108 changes: 64 additions & 44 deletions zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private[consumer] final class Runloop private (
sameThreadRuntime: Runtime[Any],
consumer: ConsumerAccess,
commandQueue: Queue[RunloopCommand],
pollDataQueue: Queue[PollData],
partitionsHub: Hub[Take[Throwable, PartitionAssignment]],
diagnostics: Diagnostics,
maxStreamPullInterval: Duration,
Expand Down Expand Up @@ -103,54 +104,53 @@ private[consumer] final class Runloop private (
)
}

private def processPollDataQueue =
ZStream
.fromQueueWithShutdown(pollDataQueue)
.mapZIO(offerRecordsToStreams)
.runDrain

/**
* Offer records retrieved from poll() call to the streams.
*
* @return
* Remaining pending requests
*/
private def offerRecordsToStreams(
partitionStreams: Chunk[PartitionStreamControl],
pendingRequests: Chunk[RunloopCommand.Request],
ignoreRecordsForTps: Set[TopicPartition],
polledRecords: ConsumerRecords[Array[Byte], Array[Byte]]
): UIO[Runloop.FulfillResult] = {
private def offerRecordsToStreams(pollData: PollData): UIO[Unit] = {
type Record = CommittableRecord[Array[Byte], Array[Byte]]

import pollData._

// The most efficient way to get the records from [[ConsumerRecords]] per
// topic-partition, is by first getting the set of topic-partitions, and
// then requesting the records per topic-partition.
val tps = polledRecords.partitions().asScala.toSet -- ignoreRecordsForTps
val fulfillResult = Runloop.FulfillResult(pendingRequests = pendingRequests.filter(req => !tps.contains(req.tp)))
val tps = polledRecords.partitions().asScala.toSet -- ignoreRecordsForTps
val streams =
if (tps.isEmpty) Chunk.empty else partitionStreams.filter(streamControl => tps.contains(streamControl.tp))

if (streams.isEmpty) ZIO.succeed(fulfillResult)
else {
for {
consumerGroupMetadata <- getConsumerGroupMetadataIfAny
_ <- ZIO.foreachParDiscard(streams) { streamControl =>
val tp = streamControl.tp
val records = polledRecords.records(tp)
if (records.isEmpty) {
streamControl.offerRecords(Chunk.empty)
} else {
val builder = ChunkBuilder.make[Record](records.size())
val iterator = records.iterator()
while (iterator.hasNext) {
val consumerRecord = iterator.next()
builder +=
CommittableRecord[Array[Byte], Array[Byte]](
record = consumerRecord,
commitHandle = committer.commit,
consumerGroupMetadata = consumerGroupMetadata
)
}
streamControl.offerRecords(builder.result())
}
}
} yield fulfillResult
}
ZIO
.foreachParDiscard(streams) { streamControl =>
val tp = streamControl.tp
val records = polledRecords.records(tp)
if (records.isEmpty) {
streamControl.offerRecords(Chunk.empty)
} else {
val builder = ChunkBuilder.make[Record](records.size())
val iterator = records.iterator()
while (iterator.hasNext) {
val consumerRecord = iterator.next()
builder +=
CommittableRecord[Array[Byte], Array[Byte]](
record = consumerRecord,
commitHandle = committer.commit,
consumerGroupMetadata = consumerGroupMetadata
)
}
streamControl.offerRecords(builder.result())
}
}
.when(streams.nonEmpty)
.unit
}

private val getConsumerGroupMetadataIfAny: UIO[Option[ConsumerGroupMetadata]] =
Expand Down Expand Up @@ -323,18 +323,27 @@ private[consumer] final class Runloop private (
}
} yield pollresult
}
fulfillResult <- offerRecordsToStreams(
pollResult.assignedStreams,
pollResult.pendingRequests,
pollResult.ignoreRecordsForTps,
pollResult.records
)
consumerGroupMetadata <- getConsumerGroupMetadataIfAny
_ <- pollDataQueue.offer(
PollData(
pollResult.assignedStreams,
pollResult.pendingRequests,
pollResult.ignoreRecordsForTps,
pollResult.records,
consumerGroupMetadata
)
)
_ <- committer.cleanupPendingCommits
_ <- checkStreamPullInterval(pollResult.assignedStreams)
} yield state.copy(
pendingRequests = fulfillResult.pendingRequests,
assignedStreams = pollResult.assignedStreams
)
} yield {
val tps = pollResult.records.partitions().asScala.toSet -- pollResult.ignoreRecordsForTps
val updatedPendingRequests = pollResult.pendingRequests.filter(req => !tps.contains(req.tp))

state.copy(
pendingRequests = updatedPendingRequests,
assignedStreams = pollResult.assignedStreams
)
}
}

/**
Expand Down Expand Up @@ -583,6 +592,7 @@ object Runloop {
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized))
commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown)
pollDataQueue <- ZIO.acquireRelease(Queue.bounded(2)[PollData])(_.shutdown)
lastRebalanceEvent <- Ref.Synchronized.make[RebalanceEvent](RebalanceEvent.None)
initialState = State.initial
currentStateRef <- Ref.make(initialState)
Expand Down Expand Up @@ -611,6 +621,7 @@ object Runloop {
sameThreadRuntime = sameThreadRuntime,
consumer = consumer,
commandQueue = commandQueue,
pollDataQueue = pollDataQueue,
partitionsHub = partitionsHub,
diagnostics = diagnostics,
maxStreamPullInterval = maxStreamPullInterval,
Expand All @@ -625,6 +636,7 @@ object Runloop {

// Run the entire loop on a dedicated thread to avoid executor shifts
executor <- RunloopExecutor.newInstance
_ <- runloop.processPollDataQueue.forkScoped
fiber <- ZIO.onExecutor(executor)(runloop.run(initialState)).forkScoped
waitForRunloopStop = fiber.join.orDie

Expand Down Expand Up @@ -652,4 +664,12 @@ object Runloop {
)
}

private case class PollData(
partitionStreams: Chunk[PartitionStreamControl],
pendingRequests: Chunk[RunloopCommand.Request],
ignoreRecordsForTps: Set[TopicPartition],
polledRecords: ConsumerRecords[Array[Byte], Array[Byte]],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
)

}

0 comments on commit a2efbe0

Please sign in to comment.