Skip to content

Commit

Permalink
fully encapsulate the batch-building in State
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-git committed Oct 28, 2024
1 parent b513596 commit 32f7625
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import com.evolutiongaming.skafka.Offset
* Receives list of records from [[ReplicateRecords]], groups and optimizes similar or sequential actions, like:
* - two or more `append` or `delete` actions are merged into one
* - optimizes list of records to minimize load on Cassandra, like:
* - aggregate effective actions by processing them from last record down to first
* - aggregate effective actions by processing them from the youngest record down to oldest
* - ignore/drop all actions, which are before last `Purge`
* - ignore all `Mark` actions
* - if `append`(s) are followed by `delete` all `append`(s), except last, are dropped
Expand All @@ -23,17 +23,23 @@ private[journal] sealed abstract class Batch extends Product {
private[journal] object Batch {

def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch] = {
records.reverse.foldLeft(State()) { _.handle(_) }.batches
State(records).batches
}

/** Builds minimal set of actions, which will execute less calls to Cassandra while producing the same result */
private object State {
def apply(records: NonEmptyList[ActionRecord[Action]]): State = {
records.reverse.foldLeft(State()) { _.handle(_) }
}
}

private final case class State(
purge: Option[Purge] = None,
appends: Option[Appends] = None,
delete: Option[Delete] = None,
private val purge: Option[Purge] = None,
private val appends: Option[Appends] = None,
private val delete: Option[Delete] = None,
) {

// Expects records to be provided in revered order, e.g. youngest first
def handle: ActionRecord[Action] => State = {
private def handle: ActionRecord[Action] => State = {
case _ if this.purge.nonEmpty => // ignore all actions before `Purge`
this

Expand Down

0 comments on commit 32f7625

Please sign in to comment.