Skip to content

Commit

Permalink
Merge pull request #8 from bilal-fazlani/use-message-id-store
Browse files Browse the repository at this point in the history
Use message id store
  • Loading branch information
bilal-fazlani authored Jul 13, 2023
2 parents e53c791 + de3a0e1 commit 98dba3e
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 88 deletions.
38 changes: 23 additions & 15 deletions docs/api-reference/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,23 @@ You can send a message to any `NodeId` using `NodeId.send()` API. It takes a `Se

1. these will be sent to all nodes in cluster

### 3. `reply`
### 3. `ask`

From within `receive` function, you can call `reply` api to send a reply message to the source of the current message.
`ask` api is a combination of `send` and `receive`. It sends a message to a remote node and waits for a reply. It takes a `Sendable` & `Receive` message and returns a `Reply` message. It also takes a timeout argument which is the maximum time to wait for a reply. It expects a `zio.json.JsonDecoder` instance for the reply & a `zio.json.JsonEncoder` instance for the request message. `ask` api can be called from within and outside of `receive` function.

<!--codeinclude-->
[Reply](../../examples/echo/src/main/scala/com/example/IODocs.scala) inside_block:Reply
[Ask](../../examples/echo/src/main/scala/com/example/IODocs.scala) inside_block:Ask
<!--/codeinclude-->

`reply` api takes an instance of `Sendable` & `Reply` message which has a `zio.json.JsonEncoder` instance.
1. `MessageId.next` gives next sequential message id

!!! tip
`reply` can be called only inside of receive function. Outside of the `receive` function, you can use `send` api which takes a remote `NodeId` argument.
Use `MessageId.next` to generate a new message id. It is a sequential id generator

### 4. `ask`

`ask` api is a combination of `send` and `receive`. It sends a message to a remote node and waits for a reply. It takes a `Sendable` & `Receive` message and returns a `Reply` message. It also takes a timeout argument which is the maximum time to wait for a reply. It expects a `zio.json.JsonDecoder` instance for the reply & a `zio.json.JsonEncoder` instance for the request message. `ask` api can be called from within and outside of `receive` function.

<!--codeinclude-->
[Ask](../../examples/echo/src/main/scala/com/example/IODocs.scala) inside_block:Ask
<!--/codeinclude-->
!!! important danger
Make sure to use different message ids for different messages. If you use the same message id for different messages, the receiver will not be able to map the response to the request

The `ask` api can return either a successful response or an `AskError`.
The `ask` api can return either a successful response or an `AskError`

<!--codeinclude-->
[AskError](../../zio-maelstrom/src/main/scala/com/bilal-fazlani/zio-maelstrom/MessageSender.scala) inside_block:ask_error
Expand All @@ -161,7 +156,20 @@ Ask error can be one of the following:
[Ask error handling](../../examples/echo/src/main/scala/com/example/ErrorDocs.scala) inside_block:GetErrorMessage
<!--/codeinclude-->

Sender can send an error message if it encounters an error while processing the request message or when request is invalid. You can read more about error messages in the [next section](#error-messages).
Sender can send an error message if it encounters an error while processing the request message or when request is invalid. You can read more about error messages in the [error messages section](#error-messages)

### 4. `reply`

From within `receive` function, you can call `reply` api to send a reply message to the source of the current message.

<!--codeinclude-->
[Reply](../../examples/echo/src/main/scala/com/example/IODocs.scala) inside_block:Reply
<!--/codeinclude-->

`reply` api takes an instance of `Sendable` & `Reply` message which has a `zio.json.JsonEncoder` instance.

!!! tip
`reply` can be called only inside of receive function. Outside of the `receive` function, you can use `send` api which takes a remote `NodeId` argument.

## Error messages

Expand Down Expand Up @@ -212,7 +220,7 @@ ZIO-Maelstrom provides `LinkKv`, `LwwKv`, `SeqKv` & `LinTso` clients to interact
`SeqKv`, `LwwKv` & `LinKv` are all key value stores. They have the same api but different consistency guarantees.

!!! note
`read`, `write` and `cas` apis are all built on top of [`ask`](#4-ask) api. So they can return an `AskError` which you may need to handle. According to [maelstrom documentation](https://github.com/jepsen-io/maelstrom/blob/main/doc/workloads.md#rpc-cas), they can return `KeyDoesNotExist` or `PreconditionFailed` error codes.
`read`, `write` and `cas` apis are all built on top of [`ask`](#3-ask) api. So they can return an `AskError` which you may need to handle. According to [maelstrom documentation](https://github.com/jepsen-io/maelstrom/blob/main/doc/workloads.md#rpc-cas), they can return `KeyDoesNotExist` or `PreconditionFailed` error codes.

!!! tip
key and value of the key value store can be any type that has a `zio.json.JsonCodec` instance
Expand Down
27 changes: 15 additions & 12 deletions examples/echo/src/main/scala/com/example/ErrorDocs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,19 @@ object ErrorDocs:

case class Answer(in_reply_to: MessageId, text: String) extends Reply derives JsonCodec

val askResponse: ZIO[MaelstromRuntime, Nothing, Unit] =
NodeId("g4")
.ask[Answer](Query(1, MessageId(1)), 5.seconds)
.flatMap(answer => logInfo(s"answer: $answer"))
.catchAll {
case t: Timeout => logError(s"timeout: ${t.timeout}")
case d: DecodingFailure => logError(s"decoding failure: ${d.error}")
case e: ErrorMessage =>
val code: ErrorCode = e.code
val text: String = e.text
logError(s"error code: $code, error text: $text")
}
val askResponse: ZIO[MaelstromRuntime, AskError, Unit] = for
msgId <- MessageId.next
answer <- NodeId("g4").ask[Answer](Query(1, msgId), 5.seconds)
_ <- logInfo(s"answer: $answer")
yield ()

askResponse
.catchAll {
case t: Timeout => logError(s"timeout: ${t.timeout}")
case d: DecodingFailure => logError(s"decoding failure: ${d.error}")
case e: ErrorMessage =>
val code: ErrorCode = e.code
val text: String = e.text
logError(s"error code: $code, error text: $text")
}
}
12 changes: 7 additions & 5 deletions examples/echo/src/main/scala/com/example/IODocs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ object IODocs:
}

object Send {
case class Gossip(msg_id: MessageId, numbers: Seq[Int], `type`: String = "gossip")
case class Gossip(numbers: Seq[Int], `type`: String = "gossip")
extends Sendable derives JsonCodec

val messageHandler =
receive[Gossip] {
case msg: Gossip =>
ZIO.foreach(others)(_.send(Gossip(MessageId(5), Seq(1,2)))).unit //(1)!
ZIO.foreach(others)(_.send(Gossip(Seq(1,2)))).unit //(1)!
}

val result = NodeId("n5") send Gossip(MessageId(1), Seq(1,2))
val result = NodeId("n5") send Gossip(Seq(1,2))
}

object Reply {
Expand All @@ -55,8 +55,10 @@ object IODocs:
case class GossipOk(in_reply_to: MessageId, myNumbers: Seq[Int], `type`: String = "gossip_ok")
extends Reply derives JsonCodec

val gosspiResult: ZIO[MessageSender, AskError, GossipOk] =
NodeId("n2").ask[GossipOk](Gossip(MessageId(1), Seq(1,2)), 5.seconds)
val gosspiResult: ZIO[MaelstromRuntime, AskError, GossipOk] =
MessageId.next.flatMap( msgId => //(1)!
NodeId("n2").ask[GossipOk](Gossip(msgId, Seq(1,2)), 5.seconds)
)
}

//format: on
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import java.nio.file.Path
type Services = LinKv & SeqKv & LwwKv & LinTso

// definition {
type MaelstromRuntime = Initialisation & RequestHandler & MessageSender & Services & Logger &
Settings
type MaelstromRuntime = Initialisation & RequestHandler & MessageSender & MessageIdStore &
Services & Logger & Settings
// }

object MaelstromRuntime:
Expand All @@ -29,6 +29,8 @@ object MaelstromRuntime:
inputStream,
OutputChannel.stdOut,
CallbackRegistry.live,
MessageIdStore.live,

// Services
LinKv.live,
SeqKv.live,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.bilalfazlani.zioMaelstrom

import zio.{ZIO, Ref, ZLayer}
import protocol.MessageId

trait MessageIdStore:
def next: ZIO[Any, Nothing, MessageId]

private[zioMaelstrom] object MessageIdStore:
val next: ZIO[MessageIdStore, Nothing, MessageId] = ZIO.serviceWithZIO(_.next)

val live: ZLayer[Any, Nothing, MessageIdStore] =
ZLayer.fromZIO(Ref.make(0)) >>> ZLayer.fromFunction(MessageIdStoreImpl.apply)

private case class MessageIdStoreImpl(ref: Ref[Int]) extends MessageIdStore:
def next: ZIO[Any, Nothing, MessageId] =
ref.updateAndGet(_ + 1).map(MessageId(_))
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,7 @@ object MessageId:
given JsonDecoder[MessageId] = JsonDecoder.int.map(MessageId(_))
given JsonFieldEncoder[MessageId] = JsonFieldEncoder.int.contramap(identity)
given JsonFieldDecoder[MessageId] = JsonFieldDecoder.int.map(MessageId(_))
val random = Random.nextIntBounded(Int.MaxValue).map(MessageId(_))

extension (id: MessageId)
@targetName("messageIdInc")
infix def inc: MessageId = MessageId(id.toInt + 1)
val next = MessageIdStore.next

// format: off
// errorCodes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ private[zioMaelstrom] trait KvService:

private[zioMaelstrom] case class KvImpl(
private val remote: NodeId,
private val sender: MessageSender
private val sender: MessageSender,
private val messageIdStore: MessageIdStore
) extends KvService {

override def read[Key: JsonEncoder, Value: JsonDecoder](
key: Key,
timeout: Duration
): ZIO[Any, AskError, Value] =
MessageId.random.flatMap { messageId =>
messageIdStore.next.flatMap { messageId =>
sender
.ask[KvRead[Key], KvReadOk[Value]](KvRead(key, messageId), remote, timeout)
.map(_.value)
Expand All @@ -45,7 +46,7 @@ private[zioMaelstrom] case class KvImpl(
value: Value,
timeout: Duration
): ZIO[Any, AskError, Unit] =
MessageId.random.flatMap { messageId =>
messageIdStore.next.flatMap { messageId =>
sender
.ask[KvWrite[Key, Value], KvWriteOk](KvWrite(key, value, messageId), remote, timeout)
.unit
Expand All @@ -58,7 +59,7 @@ private[zioMaelstrom] case class KvImpl(
createIfNotExists: Boolean,
timeout: Duration
): ZIO[Any, AskError, Unit] =
MessageId.random.flatMap { messageId =>
messageIdStore.next.flatMap { messageId =>
sender
.ask[CompareAndSwap[Key, Value], CompareAndSwapOk](
CompareAndSwap(key, from, to, createIfNotExists, messageId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ object LinKv:
): ZIO[LinKv, AskError, Unit] =
ZIO.serviceWithZIO[LinKv](_.cas(key, from, to, createIfNotExists, timeout))

private[zioMaelstrom] val live: ZLayer[MessageSender, Nothing, LinKv] = ZLayer.fromZIO(
for
sender <- ZIO.service[MessageSender]
kvImpl = KvImpl(NodeId("lin-kv"), sender)
yield new LinKv:
export kvImpl.*
)
private[zioMaelstrom] val live: ZLayer[MessageSender & MessageIdStore, Nothing, LinKv] =
ZLayer.fromZIO(
for
sender <- ZIO.service[MessageSender]
messageIdStore <- ZIO.service[MessageIdStore]
kvImpl = KvImpl(NodeId("lin-kv"), sender, messageIdStore)
yield new LinKv:
export kvImpl.*
)
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ object LinTso:
def ts(timeout: Duration): ZIO[LinTso, AskError, Int] =
ZIO.serviceWithZIO(_.ts(timeout))

private[zioMaelstrom] val live: ZLayer[MessageSender, Nothing, LinTso] =
private[zioMaelstrom] val live: ZLayer[MessageSender & MessageIdStore, Nothing, LinTso] =
ZLayer.fromFunction(LinTsoImpl.apply)

private case class LinTsoImpl(sender: MessageSender) extends LinTso:
private case class LinTsoImpl(sender: MessageSender, messageIdStore: MessageIdStore) extends LinTso:
def ts(timeout: Duration): ZIO[Any, AskError, Int] =
MessageId.random.flatMap { messageId =>
messageIdStore.next.flatMap { messageId =>
sender.ask[Ts, TsOk](Ts(messageId), NodeId("lin-tso"), timeout).map(_.ts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ object LwwKv:
): ZIO[LwwKv, AskError, Unit] =
ZIO.serviceWithZIO[LwwKv](_.cas(key, from, to, createIfNotExists, timeout))

private[zioMaelstrom] val live: ZLayer[MessageSender, Nothing, LwwKv] = ZLayer.fromZIO(
for
sender <- ZIO.service[MessageSender]
kvImpl = KvImpl(NodeId("lww-kv"), sender)
yield new LwwKv:
export kvImpl.*
)
private[zioMaelstrom] val live: ZLayer[MessageSender & MessageIdStore, Nothing, LwwKv] =
ZLayer.fromZIO(
for
sender <- ZIO.service[MessageSender]
messageIdStore <- ZIO.service[MessageIdStore]
kvImpl = KvImpl(NodeId("lww-kv"), sender, messageIdStore)
yield new LwwKv:
export kvImpl.*
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ object SeqKv:
): ZIO[SeqKv, AskError, Unit] =
ZIO.serviceWithZIO[SeqKv](_.cas(key, from, to, createIfNotExists, timeout))

private[zioMaelstrom] val live: ZLayer[MessageSender, Nothing, SeqKv] = ZLayer.fromZIO(
for
sender <- ZIO.service[MessageSender]
kvImpl = KvImpl(NodeId("seq-kv"), sender)
yield new SeqKv:
export kvImpl.*
)
private[zioMaelstrom] val live: ZLayer[MessageSender & MessageIdStore, Nothing, SeqKv] =
ZLayer.fromZIO(
for
sender <- ZIO.service[MessageSender]
messageIdStore <- ZIO.service[MessageIdStore]
kvImpl = KvImpl(NodeId("seq-kv"), sender, messageIdStore)
yield new SeqKv:
export kvImpl.*
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import zio.*
import zio.json.*
import protocol.*

case class KvFake(ref: Ref.Synchronized[Map[Any, Any]]) extends KvService:
case class KvFake(ref: Ref.Synchronized[Map[Any, Any]], messageIdStore: MessageIdStore)
extends KvService:
override def read[Key: JsonEncoder, Value: JsonDecoder](
key: Key,
timeout: Duration
Expand All @@ -31,11 +32,11 @@ case class KvFake(ref: Ref.Synchronized[Map[Any, Any]]) extends KvService:
case Some(`from`) => ZIO.succeed(map + (key -> to))
case None if createIfNotExists => ZIO.succeed(map + (key -> to))
case None =>
MessageId.random.flatMap(messageId =>
messageIdStore.next.flatMap(messageId =>
ZIO.fail(ErrorMessage(messageId, ErrorCode.KeyDoesNotExist, s"Key $key does not exist"))
)
case Some(other) =>
MessageId.random.flatMap(messageId =>
messageIdStore.next.flatMap(messageId =>
ZIO.fail(
ErrorMessage(
messageId,
Expand All @@ -48,23 +49,38 @@ case class KvFake(ref: Ref.Synchronized[Map[Any, Any]]) extends KvService:
}

object KvFake:
val linKv = ZLayer.fromZIO(Ref.Synchronized.make(Map.empty[Any, Any]).map { r =>
val impl = KvFake(r)
new LinKv {
export impl.*
}
})

val seqKv = ZLayer.fromZIO(Ref.Synchronized.make(Map.empty[Any, Any]).map { r =>
val impl = KvFake(r)
new SeqKv {
export impl.*
}
})
private val mapLayer = ZLayer.fromZIO(Ref.Synchronized.make(Map.empty[Any, Any]))

val lwwKv = ZLayer.fromZIO(Ref.Synchronized.make(Map.empty[Any, Any]).map { r =>
val impl = KvFake(r)
new LwwKv {
export impl.*
}
})
val linKv: ZLayer[Any, Nothing, LinKv] = ZLayer.make[LinKv](
ZLayer.fromFunction(KvFake.apply),
mapLayer,
MessageIdStore.live,
ZLayer.fromFunction((fake: KvFake) =>
new LinKv {
export fake.*
}
)
)

val seqKv: ZLayer[Any, Nothing, SeqKv] = ZLayer.make[SeqKv](
ZLayer.fromFunction(KvFake.apply),
mapLayer,
MessageIdStore.live,
ZLayer.fromFunction((fake: KvFake) =>
new SeqKv {
export fake.*
}
)
)

val lwwKv: ZLayer[Any, Nothing, LwwKv] = ZLayer.make[LwwKv](
ZLayer.fromFunction(KvFake.apply),
mapLayer,
MessageIdStore.live,
ZLayer.fromFunction((fake: KvFake) =>
new LwwKv {
export fake.*
}
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ object TestRuntime:
InputStream.queue, // FAKE
InputChannel.live,
CallbackRegistry.live,
MessageIdStore.live,

// Services
KvFake.linKv,
KvFake.seqKv,
Expand Down

0 comments on commit 98dba3e

Please sign in to comment.