Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Apr 2, 2024
1 parent 90fa193 commit 865665b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ final case class RebalanceListener(
)

def runOnExecutor(executor: Executor): RebalanceListener = RebalanceListener(
(assigned, consumer) => onAssigned(assigned, consumer).onExecutor(executor),
(revoked, consumer) => onRevoked(revoked, consumer).onExecutor(executor),
(lost, consumer) => onLost(lost, consumer).onExecutor(executor)
assigned => onAssigned(assigned).onExecutor(executor),
revoked => onRevoked(revoked).onExecutor(executor),
lost => onLost(lost).onExecutor(executor)
)

def toKafka(
Expand Down Expand Up @@ -67,14 +67,14 @@ final case class RebalanceListener(

object RebalanceListener {
def apply(
onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
onRevoked: (Set[TopicPartition], RebalanceConsumer) => Task[Unit]
onAssigned: Set[TopicPartition] => Task[Unit],
onRevoked: Set[TopicPartition] => Task[Unit]
): RebalanceListener =
RebalanceListener(onAssigned, onRevoked, onRevoked)

val noop: RebalanceListener = RebalanceListener(
(_, _) => ZIO.unit,
(_, _) => ZIO.unit,
(_, _) => ZIO.unit
_ => ZIO.unit,
_ => ZIO.unit,
_ => ZIO.unit
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private[consumer] final class Runloop private (
// - updates `lastRebalanceEvent`
//
val recordRebalanceRebalancingListener = RebalanceListener(
onAssigned = (assignedTps, _) =>
onAssigned = assignedTps =>
for {
rebalanceEvent <- lastRebalanceEvent.get
_ <- ZIO.logDebug {
Expand All @@ -215,7 +215,7 @@ private[consumer] final class Runloop private (
_ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd))
_ <- ZIO.logTrace("onAssigned done")
} yield (),
onRevoked = (revokedTps, _) =>
onRevoked = revokedTps =>
for {
rebalanceEvent <- lastRebalanceEvent.get
_ <- ZIO.logDebug {
Expand All @@ -229,7 +229,7 @@ private[consumer] final class Runloop private (
_ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd))
_ <- ZIO.logTrace("onRevoked done")
} yield (),
onLost = (lostTps, _) =>
onLost = lostTps =>
for {
_ <- ZIO.logDebug(s"${lostTps.size} partitions are lost")
rebalanceEvent <- lastRebalanceEvent.get
Expand Down

0 comments on commit 865665b

Please sign in to comment.