diff --git a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala index ffaa19366..2774e0e41 100644 --- a/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala +++ b/replicator/src/main/scala/com/evolutiongaming/kafka/journal/replicator/Batch.scala @@ -9,7 +9,7 @@ import com.evolutiongaming.skafka.Offset * Receives list of records from [[ReplicateRecords]], groups and optimizes similar or sequential actions, like: * - two or more `append` or `delete` actions are merged into one * - optimizes list of records to minimize load on Cassandra, like: - * - aggregate effective actions by processing them from last record down to first + * - aggregate effective actions by processing them from the youngest record down to oldest * - ignore/drop all actions, which are before last `Purge` * - ignore all `Mark` actions * - if `append`(s) are followed by `delete` all `append`(s), except last, are dropped @@ -23,17 +23,23 @@ private[journal] sealed abstract class Batch extends Product { private[journal] object Batch { def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch] = { - records.reverse.foldLeft(State()) { _.handle(_) }.batches + State(records).batches + } + + /** Builds minimal set of actions, which will execute less calls to Cassandra while producing the same result */ + private object State { + def apply(records: NonEmptyList[ActionRecord[Action]]): State = { + records.reverse.foldLeft(State()) { _.handle(_) } + } } private final case class State( - purge: Option[Purge] = None, - appends: Option[Appends] = None, - delete: Option[Delete] = None, + private val purge: Option[Purge] = None, + private val appends: Option[Appends] = None, + private val delete: Option[Delete] = None, ) { - // Expects records to be provided in revered order, e.g. youngest first - def handle: ActionRecord[Action] => State = { + private def handle: ActionRecord[Action] => State = { case _ if this.purge.nonEmpty => // ignore all actions before `Purge` this