Skip to content

Commit

Permalink
Implemented SkunkDriver for CQRS domain
Browse files Browse the repository at this point in the history
  • Loading branch information
hnaderi committed Dec 6, 2022
1 parent bb22643 commit f7134ab
Show file tree
Hide file tree
Showing 24 changed files with 577 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .jvmopts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-Xms512M
-Xmx4G
-Xmx6G
-Xss2M
6 changes: 6 additions & 0 deletions examples/src/main/scala/StomatonExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import cats.data.*
import cats.effect.IO
import cats.implicits.*
import edomata.backend.Backend
import edomata.skunk.*
import edomata.syntax.all.*
import io.circe.Codec

Expand Down Expand Up @@ -65,6 +66,11 @@ object StomatonExample {
val driver: edomata.backend.cqrs.StorageDriver[IO, Codec] = ???
val backend = Backend.builder(FooService).use(driver).build

given BackendCodec[Foo] = ???
given BackendCodec[Int] = ???
val backend2 =
Backend.builder(FooService).use(SkunkDriver2[IO]("example", ???)).build

backend.use { b =>
val srv = b.compile(FooService().liftTo[IO])

Expand Down
6 changes: 3 additions & 3 deletions modules/backend-tests/js/src/main/scala/StorageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import edomata.backend.*
import munit.CatsEffectSuite
import munit.Location

abstract class StorageSuite[S, E, R, N](
storage: Resource[IO, Backend[IO, S, E, R, N]],
abstract class StorageSuite[Res](
storage: Resource[IO, Res],
suiteName: String
) extends CatsEffectSuite {
def check(name: String)(f: Backend[IO, S, E, R, N] => IO[Unit])(using
def check(name: String)(f: Res => IO[Unit])(using
Location
) = test(s"${suiteName}: ${name}")(storage.use(f))
}
6 changes: 3 additions & 3 deletions modules/backend-tests/jvm/src/main/scala/StorageSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import edomata.backend.*
import munit.CatsEffectSuite
import munit.Location

abstract class StorageSuite[S, E, R, N](
storage: Resource[IO, Backend[IO, S, E, R, N]],
abstract class StorageSuite[Res](
storage: Resource[IO, Res],
suiteName: String
) extends CatsEffectSuite {
private val storageFixture = ResourceSuiteLocalFixture("Storage", storage)

final override def munitFixtures = List(storageFixture)

def check(name: String)(f: Backend[IO, S, E, R, N] => IO[Unit])(using
def check(name: String)(f: Res => IO[Unit])(using
Location
) = test(s"${suiteName}: ${name}")(f(storageFixture()))
}
91 changes: 91 additions & 0 deletions modules/backend-tests/shared/src/main/scala/CqrsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2021 Hossein Naderi
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package tests

import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.std.UUIDGen
import cats.implicits.*
import edomata.backend.cqrs.*
import edomata.core.*
import edomata.syntax.all.*

import java.time.Instant

abstract class CqrsSuite[S, R, N](
backend: Resource[IO, Backend[IO, Int, String, Int]],
name: String
) extends StorageSuite(backend, name) {
import TestCQRSModel.given_StateModelTC_State

private val dsl = TestCQRSDomain.dsl
private val rndString = UUIDGen.randomString[IO]

check("inserts state") { b =>
val srv = b.compile(dsl.set(2))
for {
aggId <- randomString
cmdId <- randomString
_ <- srv(CommandMessage(cmdId, Instant.EPOCH, aggId, 0))
.assertEquals(Right(()))
_ <- b.repository
.get(aggId)
.assertEquals(AggregateS(version = 0, state = 2))
} yield ()
}

check("updates existing state") { b =>
val srv = b.compile(dsl.router(i => dsl.set(i)))
for {
aggId <- randomString

cmdId <- randomString
_ <- srv(CommandMessage(cmdId, Instant.EPOCH, aggId, 2))

cmdId2 <- randomString
_ <- srv(CommandMessage(cmdId2, Instant.EPOCH, aggId, 5))
.assertEquals(Right(()))

_ <- b.repository
.get(aggId)
.assertEquals(AggregateS(version = 1, state = 5))
} yield ()
}

check("publishes notifications") { b =>
val srv = b.compile(dsl.publish(1, 2, 3))
for {
aggId <- randomString
cmdId <- randomString
_ <- srv(CommandMessage(cmdId, Instant.EPOCH, aggId, 0))
.assertEquals(Right(()))
_ <- b.repository.get(aggId).assertEquals(AggregateS(0, 0))
_ <- b.outbox.read
.filter(_.streamId == aggId)
.map(_.data)
.compile
.toList
.assertEquals(List(1, 2, 3))
} yield ()
}

}

object TestCQRSModel extends CQRSModel[Int, String] {
def initial: Int = 0
}
val TestCQRSDomain = TestCQRSModel.domain[Int, Int]
3 changes: 1 addition & 2 deletions modules/backend/src/main/scala/OutboxConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ package edomata.backend
import cats.Monad
import cats.data.NonEmptyChain
import cats.implicits.*
import edomata.backend.Backend
import edomata.backend.OutboxItem
import fs2.Stream
import fs2.Stream.*

object OutboxConsumer {
def apply[F[_], S, E, R, N](
backend: Backend[F, S, E, R, N]
backend: eventsourcing.Backend[F, S, E, R, N]
)(
run: edomata.backend.OutboxItem[N] => F[Unit]
)(using F: Monad[F]): Stream[F, Nothing] =
Expand Down
41 changes: 22 additions & 19 deletions modules/backend/src/main/scala/cqrs/Backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,30 @@ trait Backend[F[_], S, R, N] {
final class BackendBuilder[F[_]: Async, Codec[_], C, S, R, N] private[cqrs] (
driver: Resource[F, StorageDriver[F, Codec]],
domain: CQRSDomain[C, S, R, N],
snapshot: StorageDriver[F, Codec] => Resource[F, SnapshotStore[F, S]],
commandCache: Option[Resource[F, CommandStore[F]]],
val maxRetry: Int = 5,
val retryInitialDelay: FiniteDuration = 2.seconds
)(using StateModelTC[S]) {
private def copy(
driver: Resource[F, StorageDriver[F, Codec]] = driver,
domain: CQRSDomain[C, S, R, N] = domain,
snapshot: StorageDriver[F, Codec] => Resource[F, SnapshotStore[F, S]] =
snapshot,
commandCache: Option[Resource[F, CommandStore[F]]] = commandCache,
maxRetry: Int = maxRetry,
retryInitialDelay: FiniteDuration = retryInitialDelay
) = new BackendBuilder(
driver = driver,
domain = domain,
snapshot = snapshot,
maxRetry = maxRetry,
commandCache = commandCache,
retryInitialDelay = retryInitialDelay
)

def disableCache: BackendBuilder[F, Codec, C, S, R, N] = ???
def disableCache: BackendBuilder[F, Codec, C, S, R, N] =
copy(commandCache = None)

def withCommandCache(
cache: Resource[F, CommandStore[F]]
): BackendBuilder[F, Codec, C, S, R, N] = ???
): BackendBuilder[F, Codec, C, S, R, N] = copy(commandCache = Some(cache))

def withCommandCache(
cache: CommandStore[F]
Expand All @@ -75,27 +72,34 @@ final class BackendBuilder[F[_]: Async, Codec[_], C, S, R, N] private[cqrs] (

def withCommandCacheSize(
maxCommandsToCache: Int
): BackendBuilder[F, Codec, C, S, R, N] = ???

def inMemSnapshot(
maxInMem: Int = 1000
): BackendBuilder[F, Codec, C, S, R, N] =
???

def withSnapshot(
s: Resource[F, SnapshotStore[F, S]]
): BackendBuilder[F, Codec, C, S, R, N] = ???
): BackendBuilder[F, Codec, C, S, R, N] = copy(commandCache =
Some(Resource.eval(CommandStore.inMem(maxCommandsToCache)))
)

def withRetryConfig(
maxRetry: Int = maxRetry,
retryInitialDelay: FiniteDuration = retryInitialDelay
): BackendBuilder[F, Codec, C, S, R, N] =
???
copy(maxRetry = maxRetry, retryInitialDelay = retryInitialDelay)

def build(using
state: Codec[S],
notifs: Codec[N]
): Resource[F, Backend[F, S, R, N]] = ???
): Resource[F, Backend[F, S, R, N]] = for {
d <- driver
storage <- d.build[S, N, R]
} yield new Backend[F, S, R, N] {

override def compile: CommandHandler[F, S, N] =
CommandHandler.withRetry(storage.repository, maxRetry, retryInitialDelay)

override def outbox: OutboxReader[F, N] = storage.outbox

override def repository: RepositoryReader[F, S] = storage.repository

override def updates: NotificationsConsumer[F] = storage.updates

}
}

final class PartialBackendBuilder[C, S, R, N] private[backend] (
Expand All @@ -117,7 +121,6 @@ final class PartialBackendBuilder[C, S, R, N] private[backend] (
new BackendBuilder(
driver,
domain,
snapshot = _ => Resource.eval(SnapshotStore.inMem(1000)),
commandCache = Some(Resource.eval(CommandStore.inMem(1000)))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ object CommandHandler {
repository
.save(cmd, version, newState, out.notifications) >> voidF
case Left(reasons) =>
reasons.asLeft.pure
NonEmptyChain
.fromChain(out.notifications)
.fold(Monad[F].unit)(
repository.notify(cmd, _)
)
.as(reasons.asLeft)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,33 @@

package edomata.backend.cqrs

import cats.effect.Concurrent
import cats.effect.std.Queue
import cats.implicits.*
import fs2.Stream

trait NotificationsConsumer[F[_]] {
def outbox: Stream[F, Unit]
def state: Stream[F, Unit]
}

trait NotificationsPublisher[F[_]] {
def notifyOutbox: F[Unit]
def notifyState: F[Unit]
}

trait Notifications[F[_]]
extends NotificationsConsumer[F],
NotificationsPublisher[F]

object Notifications {
def apply[F[_]: Concurrent]: F[Notifications[F]] = for {
o <- Queue.circularBuffer[F, Unit](1)
s <- Queue.circularBuffer[F, Unit](1)
} yield new {
def outbox: Stream[F, Unit] = Stream.fromQueueUnterminated(o, 1)
def state: Stream[F, Unit] = Stream.fromQueueUnterminated(s, 1)
def notifyOutbox: F[Unit] = o.offer(())
def notifyState: F[Unit] = s.offer(())
}
}
9 changes: 6 additions & 3 deletions modules/backend/src/main/scala/cqrs/Repository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@ package cqrs
import cats.data.*
import edomata.core.*

trait Repository[F[_], S, E] {
trait Repository[F[_], S, E] extends RepositoryReader[F, S] {
def load(cmd: CommandMessage[?]): F[AggregateState[S]]

def get(id: StreamId): F[AggregateS[S]]

def save(
ctx: CommandMessage[?],
version: SeqNr,
newState: S,
events: Chain[E]
): F[Unit]

def notify(
ctx: CommandMessage[?],
notifications: NonEmptyChain[E]
): F[Unit]
}
7 changes: 5 additions & 2 deletions modules/backend/src/main/scala/cqrs/Storage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package edomata.backend
package cqrs

import cats.effect.kernel.Resource
import edomata.core.StateModelTC

final case class Storage[F[_], S, N, R](
repository: Repository[F, S, N],
Expand All @@ -26,7 +27,9 @@ final case class Storage[F[_], S, N, R](
)

trait StorageDriver[F[_], Codec[_]] {
def build[S: Codec, N: Codec, R](
snapshot: SnapshotStore[F, S]
def build[S, N, R](using
StateModelTC[S],
Codec[S],
Codec[N]
): Resource[F, Storage[F, S, N, R]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

package edomata.backend
package eventsourcing

import cats.effect.Concurrent
import cats.effect.std.Queue
Expand Down
4 changes: 4 additions & 0 deletions modules/backend/src/main/scala/eventsourcing/deprecated.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ type StorageDriver[F[_], Codec[_]] = eventsourcing.StorageDriver[F, Codec]
type AggregateState[S, E, R] = eventsourcing.AggregateState[S, E, R]
val AggregateState = eventsourcing.AggregateState
type CommandState[S, E, R] = eventsourcing.CommandState[S, E, R]

type NotificationsConsumer[F[_]] = eventsourcing.NotificationsConsumer[F]
type NotificationsPublisher[F[_]] = eventsourcing.NotificationsPublisher[F]
type Notifications[F[_]] = eventsourcing.Notifications[F]
Loading

0 comments on commit f7134ab

Please sign in to comment.