Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small cleanup #1065

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
},
test("handle rebalancing by completing topic-partition streams") {
val nrMessages = 50
val nrPartitions = 6
val nrPartitions = 6 // Must be even and strictly positive

for {
// Produce messages on several partitions
Expand Down Expand Up @@ -1142,6 +1142,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
test(
"it's possible to start a new consumption session from a Consumer that had a consumption session stopped previously"
) {
// NOTE:
// When this test fails with the message `100000 was not less than 100000`, it's because
// your computer is so fast that the first consumer already consumed all 100000 messages.
val numberOfMessages: Int = 100000
val kvs: Iterable[(String, String)] = Iterable.tabulate(numberOfMessages)(i => (s"key-$i", s"msg-$i"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.fetch.FetchStrategy
import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer
import zio.kafka.consumer.internal.Runloop._
import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment
import zio.stream._

Expand Down Expand Up @@ -86,26 +87,20 @@ private[consumer] final class Runloop private (
ZIO.fail(new IllegalStateException(s"Multiple onAssigned calls on rebalance listener"))
},
onRevoked = (_, _) =>
ZIO.logDebug("Rebalancing started") *>
currentStateRef.get.flatMap { state =>
// End all streams
endRevokedPartitions(
state.pendingRequests,
state.assignedStreams,
isRevoked = _ => true
).flatMap { result =>
lastRebalanceEvent.updateZIO {
case None =>
ZIO.some(Runloop.RebalanceEvent.Revoked(result))
case _ =>
ZIO.fail(
new IllegalStateException(
s"onRevoked called on rebalance listener with pending assigned event"
)
)
}
}
}
for {
_ <- ZIO.logDebug("Rebalancing started")
state <- currentStateRef.get
// End all streams
result <- endRevokedPartitions(state.pendingRequests, state.assignedStreams, isRevoked = _ => true)
_ <- lastRebalanceEvent.updateZIO {
case None =>
ZIO.some(Runloop.RebalanceEvent.Revoked(result))
case _ =>
ZIO.fail(
new IllegalStateException(s"onRevoked called on rebalance listener with pending assigned event")
)
}
} yield ()
)

if (restartStreamsOnRebalancing) {
Expand Down Expand Up @@ -483,7 +478,7 @@ private[consumer] final class Runloop private (
* - Poll periodically when we are subscribed but do not have assigned streams yet. This happens after
* initialization and rebalancing
*/
def run(initialState: State): ZIO[Scope, Throwable, Any] = {
private def run(initialState: State): ZIO[Scope, Throwable, Any] = {
import Runloop.StreamOps

ZStream
Expand Down Expand Up @@ -606,26 +601,26 @@ private[consumer] object Runloop {
ZIO.logDebug("Shut down Runloop")
)
} yield runloop
}

private[internal] final case class State(
pendingRequests: Chunk[RunloopCommand.Request],
pendingCommits: Chunk[RunloopCommand.Commit],
assignedStreams: Chunk[PartitionStreamControl],
subscriptionState: SubscriptionState
) {
def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)
private final case class State(
pendingRequests: Chunk[RunloopCommand.Request],
pendingCommits: Chunk[RunloopCommand.Commit],
assignedStreams: Chunk[PartitionStreamControl],
subscriptionState: SubscriptionState
) {
def addCommit(c: RunloopCommand.Commit): State = copy(pendingCommits = pendingCommits :+ c)
def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r)

def shouldPoll: Boolean =
subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty)
}
def shouldPoll: Boolean =
subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty)
}

object State {
val initial: State = State(
pendingRequests = Chunk.empty,
pendingCommits = Chunk.empty,
assignedStreams = Chunk.empty,
subscriptionState = SubscriptionState.NotSubscribed
)
private object State {
val initial: State = State(
pendingRequests = Chunk.empty,
pendingCommits = Chunk.empty,
assignedStreams = Chunk.empty,
subscriptionState = SubscriptionState.NotSubscribed
)
}
}