Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-git committed Oct 23, 2024
1 parent 78e6567 commit 538fd67
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,64 +20,52 @@ private[journal] object Batch_4_1_0_Alternative_reversed {
def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch_4_1_0_Alternative_reversed] = {

val state = records.reverse.foldLeft(State()) {
// TODO MR discard next 2 lines - match on explosion of types
case (state, record) =>
(state, record.action) match {
case (state, _) if state.purge.nonEmpty =>
state

case (state, purge: Action.Purge) =>
state.copy(
purge = Purge(record.offset, purge.origin, purge.version).some,
)

case (state, delete: Action.Delete) =>
state.delete match {
case Some(younger) =>
// take `origin` and `version` from "older" entity, if it has them
val origin = delete.origin.orElse(younger.origin)
val version = delete.version.orElse(younger.version)
if (younger.to < delete.to) {
state.copy(
delete = Delete(record.offset, delete.to, origin, version).some,
)
} else {
state.copy(
delete = Delete(younger.offset, younger.to, origin, version).some,
)
}
case None =>
state.copy(
delete = Delete(record.offset, delete.to, delete.origin, delete.version).some,
)
}
// ignore all actions before `Purge`
case (state, _) if state.purge.nonEmpty =>
state

case (state, ActionRecord(purge: Action.Purge, po)) =>
state.copy(
purge = Purge(po.offset, purge.origin, purge.version).some,
)

case (state, ActionRecord(delete: Action.Delete, po)) =>
val delete_ = state.delete match {
case Some(younger) =>
// take `origin` and `version` from "older" entity, if it has them
val origin = delete.origin.orElse(younger.origin)
val version = delete.version.orElse(younger.version)
// make `Delete` action with largest `seqNr` and largest `offset`
if (younger.to < delete.to) Delete(po.offset, delete.to, origin, version)
else younger.copy(origin = origin, version = version)

case (state, append: Action.Append) if state.delete.forall(_.to.value <= append.range.to) =>
state.appends match {
case Some(appends) =>
Appends(record.offset, NonEmptyList.of(ActionRecord(append, record.partitionOffset))).some
state.copy(
appends = appends
.copy(
records = ActionRecord(append, record.partitionOffset) :: appends.records,
)
.some,
)
case None =>
state.copy(
appends = Appends(record.offset, NonEmptyList.of(ActionRecord(append, record.partitionOffset))).some,
)
}
case None =>
Delete(po.offset, delete.to, delete.origin, delete.version)
}
state.copy(
delete = delete_.some,
)

case (state, _) => // ignore actions `Mark` and `Append`, if it would get deleted
state
case (state, ActionRecord(append: Action.Append, po)) if state.delete.forall(_.to.value <= append.range.to) =>
val appends = state.appends match {
case Some(appends) =>
appends.copy(records = ActionRecord(append, po) :: appends.records)
case None =>
Appends(po.offset, NonEmptyList.of(ActionRecord(append, po)))
}
state.copy(
appends = appends.some,
)

case (state, _) =>
// ignore `Action.Append`, if it would get deleted and ignore `Action.Mark`
state
}

state.batches
}

private case class State(
private final case class State(
purge: Option[Purge] = None,
appends: Option[Appends] = None,
delete: Option[Delete] = None,
Expand Down Expand Up @@ -109,6 +97,7 @@ private[journal] object Batch_4_1_0_Alternative_reversed {
case None => this.delete
}

// apply action batches in order: `Purge`, `Append`s and `Delete`
List(purge, appends, delete).flatten
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import com.evolutiongaming.skafka.Offset
* Receives list of records from [[ReplicateRecords]], groups and optimizes similar or sequential actions, like:
* - two or more `append` or `delete` actions are merged into one
* - optimizes list of records to minimize load on Cassandra, like:
* - aggregate effective actions by processing them from last record down to first
* - ignore/drop all actions, which are before last `Purge`
* - ignore all `Mark` actions
* - if `append`(s) are followed by `delete` all `append`(s), except last, are dropped
* - if `append`(s) are followed by `prune`, then all `append`(s) are dropped
* - `mark` actions are ignored
* - at the end, apply in following order: `purge`, `append`s, `delete`
* - at the end, apply all aggregated batches following order: `purge`, `append`s, `delete`
*/
private[journal] sealed abstract class Batch extends Product {

Expand All @@ -24,64 +25,52 @@ private[journal] object Batch {
def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch] = {

val state = records.reverse.foldLeft(State()) {
// TODO MR discard next 2 lines - match on explosion of types
case (state, record) =>
(state, record.action) match {
case (state, _) if state.purge.nonEmpty =>
state

case (state, purge: Action.Purge) =>
state.copy(
purge = Purge(record.offset, purge.origin, purge.version).some,
)

case (state, delete: Action.Delete) =>
state.delete match {
case Some(younger) =>
// take `origin` and `version` from "older" entity, if it has them
val origin = delete.origin.orElse(younger.origin)
val version = delete.version.orElse(younger.version)
if (younger.to < delete.to) {
state.copy(
delete = Delete(record.offset, delete.to, origin, version).some,
)
} else {
state.copy(
delete = Delete(younger.offset, younger.to, origin, version).some,
)
}
case None =>
state.copy(
delete = Delete(record.offset, delete.to, delete.origin, delete.version).some,
)
}
// ignore all actions before `Purge`
case (state, _) if state.purge.nonEmpty =>
state

case (state, ActionRecord(purge: Action.Purge, po)) =>
state.copy(
purge = Purge(po.offset, purge.origin, purge.version).some,
)

case (state, ActionRecord(delete: Action.Delete, po)) =>
val delete_ = state.delete match {
case Some(younger) =>
// take `origin` and `version` from "older" entity, if it has them
val origin = delete.origin.orElse(younger.origin)
val version = delete.version.orElse(younger.version)
// make `Delete` action with largest `seqNr` and largest `offset`
if (younger.to < delete.to) Delete(po.offset, delete.to, origin, version)
else younger.copy(origin = origin, version = version)

case (state, append: Action.Append) if state.delete.forall(_.to.value <= append.range.to) =>
state.appends match {
case Some(appends) =>
Appends(record.offset, NonEmptyList.of(ActionRecord(append, record.partitionOffset))).some
state.copy(
appends = appends
.copy(
records = ActionRecord(append, record.partitionOffset) :: appends.records,
)
.some,
)
case None =>
state.copy(
appends = Appends(record.offset, NonEmptyList.of(ActionRecord(append, record.partitionOffset))).some,
)
}
case None =>
Delete(po.offset, delete.to, delete.origin, delete.version)
}
state.copy(
delete = delete_.some,
)

case (state, _) => // ignore actions `Mark` and `Append`, if it would get deleted
state
case (state, ActionRecord(append: Action.Append, po)) if state.delete.forall(_.to.value <= append.range.to) =>
val appends = state.appends match {
case Some(appends) =>
appends.copy(records = ActionRecord(append, po) :: appends.records)
case None =>
Appends(po.offset, NonEmptyList.of(ActionRecord(append, po)))
}
state.copy(
appends = appends.some,
)

case (state, _) =>
// ignore `Action.Append`, if it would get deleted and ignore `Action.Mark`
state
}

state.batches
}

private case class State(
private final case class State(
purge: Option[Purge] = None,
appends: Option[Appends] = None,
delete: Option[Delete] = None,
Expand Down Expand Up @@ -113,6 +102,7 @@ private[journal] object Batch {
case None => this.delete
}

// apply action batches in order: `Purge`, `Append`s and `Delete`
List(purge, appends, delete).flatten
}
}
Expand Down

0 comments on commit 538fd67

Please sign in to comment.