Skip to content

Commit

Permalink
Implement EventsBySliceQuery in PersistenceTestKitReadJournal (java dsl)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning committed Feb 8, 2025
1 parent 40af227 commit ce08497
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import pekko.persistence.query.javadsl.{
ReadJournal
}
import pekko.persistence.query.typed
import pekko.persistence.query.typed.javadsl.CurrentEventsBySliceQuery
import pekko.stream.javadsl.Source
import pekko.persistence.query.typed.javadsl.{ CurrentEventsBySliceQuery, EventsBySliceQuery }
import pekko.persistence.testkit.query.scaladsl
import pekko.stream.javadsl.Source

object PersistenceTestKitReadJournal {
val Identifier = "pekko.persistence.testkit.query"
Expand All @@ -40,7 +40,8 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with CurrentEventsBySliceQuery
with EventsByTagQuery {
with EventsByTagQuery
with EventsBySliceQuery {

override def eventsByPersistenceId(
persistenceId: String,
Expand All @@ -67,6 +68,13 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
delegate.eventsByTag(tag, offset).asJava

override def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[typed.EventEnvelope[Event], NotUsed] =
delegate.eventsBySlices(entityType, minSlice, maxSlice, offset).asJava

override def sliceForPersistenceId(persistenceId: String): Int =
delegate.sliceForPersistenceId(persistenceId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

package org.apache.pekko.persistence.testkit.query

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.persistence.Persistence
import pekko.persistence.query.NoOffset
import pekko.persistence.testkit.internal.InMemStorageExtension
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.Persistence
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.NoOffset
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.internal.InMemStorageExtension
import pekko.persistence.testkit.query.javadsl.{ PersistenceTestKitReadJournal => JavaPersistenceTestKitReadJournal }
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
Expand All @@ -39,6 +38,9 @@ import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.immutable.Seq
import scala.concurrent.duration._

object EventsBySliceSpec {
val config = PersistenceTestKitPlugin.config.withFallback(
ConfigFactory.parseString("""
Expand Down Expand Up @@ -81,8 +83,16 @@ class EventsBySliceSpec

implicit val classic: pekko.actor.ActorSystem = system.classicSystem

val queries =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
private val persistenceQuery = PersistenceQuery(system)

private val queries =
persistenceQuery.readJournalFor[PersistenceTestKitReadJournal](
PersistenceTestKitReadJournal.Identifier)

private val queriesJava =
persistenceQuery.getReadJournalFor(
classOf[JavaPersistenceTestKitReadJournal],
JavaPersistenceTestKitReadJournal.Identifier)

def setup(persistenceId: String): ActorRef[Command] = {
val probe = createTestProbe[Done]()
Expand Down Expand Up @@ -131,6 +141,18 @@ class EventsBySliceSpec
probe.expectNext("c-4")
}

"find new events (Java DSL)" in {
val ackProbe = createTestProbe[Done]()
val ref = setup("c")
val src = queriesJava.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset).asScala
val probe = src.map(_.event).runWith(TestSink.probe[String]).request(5).expectNext("c-1", "c-2", "c-3")

ref ! Command("c-4", ackProbe.ref)
ackProbe.expectMessage(Done)

probe.expectNext("c-4")
}

"find new events after batched setup" in {
val ackProbe = createTestProbe[Done]()
val ref = setupBatched("d")
Expand Down

0 comments on commit ce08497

Please sign in to comment.