Skip to content

Commit

Permalink
reimplement batching in a way to preserve offset ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Nov 4, 2024
1 parent 24773ed commit d81d370
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,103 +34,102 @@ private[journal] sealed abstract class Batch extends Product {
private[journal] object Batch {

def of(records: NonEmptyList[ActionRecord[Action]]): List[Batch] = {
State(records).batches
records
.reverse
.foldLeft { State.empty } { State.fold }
.batches
}

/** Builds minimal set of actions, which will execute fewer calls to Cassandra while producing the same result */
private object State {
def apply(records: NonEmptyList[ActionRecord[Action]]): State = {
records.reverse.foldLeft(State()) { _.handle(_) }
}
private case class State(batches: List[Batch]) {

def next: Option[Batch] = batches.headOption

def delete: Option[Delete] = batches.collectFirst { case d: Delete => d }

def prepend(batch: Batch): State = new State(batch :: batches)

def replace(batch: Batch): State = new State(batch :: batches.tail)

}

private final case class State(
private val purge: Option[Purge] = None,
private val appends: Option[Appends] = None,
private val delete: Option[Delete] = None,
) {
// Expects records to be provided in reversed order, e.g., youngest first
private def handle: ActionRecord[Action] => State = {
case _ if this.purge.nonEmpty => // ignore all actions before `Purge`
this
private object State {

val empty: State = State(List.empty)

def fold(state: State, event: ActionRecord[Action]): State = event match {

case ActionRecord(_: Action.Mark, _) =>
this
case ActionRecord(_: Action.Mark, _) => state

case ActionRecord(purge: Action.Purge, partitionOffset: PartitionOffset) =>
handlePurge(purge, partitionOffset)
def purgeBatch = Purge(partitionOffset.offset, purge.origin, purge.version)

state.next match {
case Some(_: Purge) => state
case Some(_) => state.prepend(purgeBatch)
case None => state.prepend(purgeBatch)
}

case ActionRecord(delete: Action.Delete, partitionOffset: PartitionOffset) =>
handleDelete(delete, partitionOffset)
def deleteBatch(delete: Action.Delete) =
Delete(partitionOffset.offset, delete.to, delete.origin, delete.version)

state.next match {

case Some(_: Purge) => state
case None => state.prepend(deleteBatch(delete))

case Some(_: Appends) =>
// if `delete` included in `state.delete` then ignore it
val delete1 = state.delete match {
case None => delete.some
case Some(delete1) => if (delete1.to.value < delete.to.value) delete.some else None
}
delete1 match {
case Some(delete) => state.prepend(deleteBatch(delete))
case None => state
}

case Some(next: Delete) =>
if (delete.header.to.value < next.to.value) state
// if `delete` includes `next` then replace `next` with `delete`
else state.replace(deleteBatch(delete))
}

case ActionRecord(append: Action.Append, partitionOffset: PartitionOffset) =>
handleAppend(append, partitionOffset)
}
state.next match {

private def handlePurge(purge: Action.Purge, partitionOffset: PartitionOffset): State = {
this.copy(
purge = Purge(partitionOffset.offset, purge.origin, purge.version).some,
)
}
case Some(_: Purge) => state

private def handleDelete(delete: Action.Delete, partitionOffset: PartitionOffset): State = {
val delete_ = this.delete match {
case Some(younger) =>
// take `origin` and `version` from "older" entity, if it has them
val origin = delete.origin.orElse(younger.origin)
val version = delete.version.orElse(younger.version)
// make `Delete` action with largest `seqNr` and largest `offset`
if (younger.to < delete.to) Delete(partitionOffset.offset, delete.to, origin, version)
else younger.copy(origin = origin, version = version)

case None =>
Delete(partitionOffset.offset, delete.to, delete.origin, delete.version)
}
this.copy(
delete = delete_.some,
)
}
case Some(next: Appends) =>
val append1 =
// if `append` deleted by `state.delete` then ignore it
state.delete match {
case Some(delete) => if (delete.to.value < append.range.to) append.some else None
case None => append.some
}

private def handleAppend(append: Action.Append, partitionOffset: PartitionOffset): State = {
// ignore `Action.Append`, if it would get deleted
if (this.delete.forall(_.to.value <= append.range.to)) {
val appends = this.appends match {
case Some(appends) =>
appends.copy(records = ActionRecord(append, partitionOffset) :: appends.records)
case None =>
Appends(partitionOffset.offset, NonEmptyList.of(ActionRecord(append, partitionOffset)))
}
this.copy(
appends = appends.some,
)
} else {
this
}
}
append1 match {
case Some(append) =>
val record = ActionRecord(append, partitionOffset)
val appends = Appends(next.offset, record :: next.records)
// replace head (aka [state.next]) with new Appends, i.e. merge `append` with `next`
state.replace(appends)

def batches: List[Batch] = {
// we can drop first `append`, if `deleteTo` will discard it AND there is at least one more `append`
// we have to preserve one `append` to store latest `seqNr` and populate `expireAfter`
val appends = {
this.appends.flatMap { appends =>
val deleteTo = this.delete.map(_.to.value)
val records = appends.records
val actions =
if (deleteTo.contains(records.head.action.range.to)) NonEmptyList.fromList(records.tail).getOrElse(records)
else records
appends.copy(records = actions).some
}
}
case None => state
}

// if `delete` was not last action, adjust `delete`'s batch offset to update `metajournal` correctly
val delete = appends match {
case Some(appends) => this.delete.map(delete => delete.copy(offset = delete.offset max appends.offset))
case None => this.delete
}
case Some(next: Delete) =>
val record = ActionRecord(append, partitionOffset)
val appends = Appends(partitionOffset.offset, NonEmptyList.one(record))
state.prepend(appends)

// apply action batches in order: `Purge`, `Append`s and `Delete`
List(purge, appends, delete).flatten
case None =>
val record = ActionRecord(append, partitionOffset)
val appends = Appends(partitionOffset.offset, NonEmptyList.one(record))
state.prepend(appends)
}
}

}

final case class Appends(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
(Nel.of(delete(offset = 1, to = 1), mark(offset = 2)), List(deletes(offset = 1, to = 1))),
(
Nel.of(delete(offset = 1, to = 1), append(offset = 2, seqNr = 2)),
List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 2, to = 1)),
List(deletes(offset = 1, to = 1), appends(2, append(offset = 2, seqNr = 2))),
),
(
Nel.of(append(offset = 1, seqNr = 2), delete(offset = 2, to = 1)),
Expand All @@ -60,7 +60,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
),
(
Nel.of(append(offset = 1, seqNr = 1), delete(offset = 2, to = 1), append(offset = 3, seqNr = 2)),
List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 3, to = 1)),
List(appends(1, append(offset = 1, seqNr = 1)), deletes(offset = 2, to = 1), appends(3, append(offset = 3, seqNr = 2))),
),
(
Nel.of(
Expand All @@ -69,7 +69,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
append(offset = 3, seqNr = 2),
delete(offset = 4, to = 2, origin = "origin2"),
),
List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin1")),
List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin2")),
),
(
Nel.of(
Expand All @@ -78,7 +78,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
append(offset = 3, seqNr = 2),
delete(offset = 4, to = 2),
),
List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin")),
List(appends(3, append(offset = 3, seqNr = 2)), deletes(offset = 4, to = 2)),
),
(
Nel.of(
Expand All @@ -87,7 +87,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
delete(offset = 3, to = 1, origin = "origin1"),
delete(offset = 4, to = 2, origin = "origin2"),
),
List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin1")),
List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 4, to = 2, origin = "origin2")),
),
(
Nel.of(
Expand All @@ -114,7 +114,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
delete(offset = 5, to = 2),
mark(offset = 6),
),
List(deletes(offset = 5, to = 2, origin = "origin")),
List(deletes(offset = 5, to = 2)),
),
(
Nel.of(
Expand All @@ -124,7 +124,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
delete(offset = 3, to = 2),
append(offset = 4, seqNr = 3),
),
List(appends(4, append(offset = 4, seqNr = 3)), deletes(offset = 4, to = 2)),
List(appends(2, append(offset = 2, seqNr = 2)), deletes(offset = 3, to = 2), appends(4, append(offset = 4, seqNr = 3))),
),
(
Nel.of(
Expand All @@ -135,7 +135,7 @@ class BatchSpec extends AnyFunSuite with Matchers {
delete(offset = 4, to = 3),
append(offset = 5, seqNr = 4),
),
List(appends(5, append(offset = 5, seqNr = 4)), deletes(offset = 5, to = 3)),
List(appends(3, append(offset = 3, seqNr = 3)), deletes(offset = 4, to = 3), appends(5, append(offset = 5, seqNr = 4))),
),
(
Nel.of(
Expand All @@ -148,13 +148,13 @@ class BatchSpec extends AnyFunSuite with Matchers {
mark(offset = 6),
),
List(
appends(offset = 1, append(offset = 1, seqNr = 2)),
deletes(offset = 3, to = 1),
appends(
offset = 5,
append(offset = 1, seqNr = 2),
append(offset = 4, seqNr = 3),
append(offset = 5, seqNr = 4),
),
deletes(offset = 5, to = 1),
),
),
(
Expand All @@ -170,13 +170,13 @@ class BatchSpec extends AnyFunSuite with Matchers {
),
List(
appends(
offset = 7,
offset = 5,
append(offset = 2, seqNr = 3),
append(offset = 4, seqNr = 4),
append(offset = 5, seqNr = 5),
append(offset = 7, seqNr = 6),
),
deletes(offset = 7, to = 2, origin = "origin"),
deletes(offset = 6, to = 2),
appends(offset = 7, append(offset = 7, seqNr = 6)),
),
),
(
Expand All @@ -192,14 +192,14 @@ class BatchSpec extends AnyFunSuite with Matchers {
),
List(
appends(
offset = 7,
offset = 5,
append(offset = 1, seqNr = 3, seqNrs = 4),
append(offset = 2, seqNr = 5),
append(offset = 4, seqNr = 6),
append(offset = 5, seqNr = 7),
append(offset = 7, seqNr = 8),
),
deletes(offset = 7, to = 3),
deletes(offset = 6, to = 3),
appends(offset = 7, append(offset = 7, seqNr = 8)),
),
),
(Nel.of(purge(offset = 0)), List(purges(offset = 0))),
Expand Down

0 comments on commit d81d370

Please sign in to comment.