Skip to content

Commit

Permalink
Remove lost partitions from assigned streams
Browse files Browse the repository at this point in the history
Fixes #1288
  • Loading branch information
svroonland committed Oct 29, 2024
1 parent 4cd5733 commit b4f95d3
Showing 1 changed file with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private[consumer] final class Runloop private (
state <- currentStateRef.get
lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp))
_ <- ZIO.foreachDiscard(lostStreams)(_.lost)
_ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps))
_ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps, lostStreams))
_ <- ZIO.logTrace(s"onLost done")
} yield ()
)
Expand Down Expand Up @@ -830,11 +830,12 @@ object Runloop {
endedStreams = this.endedStreams ++ endedStreams
)

def onLost(lost: Set[TopicPartition]): RebalanceEvent =
def onLost(lost: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent =
copy(
wasInvoked = true,
assignedTps = assignedTps -- lost,
lostTps = lostTps ++ lost
lostTps = lostTps ++ lost,
endedStreams = this.endedStreams ++ endedStreams
)
}

Expand Down

0 comments on commit b4f95d3

Please sign in to comment.