Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New type Transport and replace WebsocketJS #163

Merged
merged 1 commit into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ lazy val didExperiments = crossProject(JSPlatform, JVMPlatform)
.jsConfigure(scalaJSViteConfigure) // Because of didJS now uses NPM libs
.configure(docConfigure)

//TODO Rename did-extra to did-framework
lazy val didExtra = crossProject(JSPlatform, JVMPlatform)
.in(file("did-extra"))
.configure(notYetPublishedConfigure) // FIXME
Expand All @@ -332,9 +333,8 @@ lazy val didExtra = crossProject(JSPlatform, JVMPlatform)
libraryDependencies += D.zioMunitTest.value,
)
.dependsOn(did % "compile;test->test")
.jvmSettings(
libraryDependencies += D.ziohttp.value,
)
.jvmSettings(libraryDependencies += D.ziohttp.value)
.jsSettings(libraryDependencies += D.dom.value)
.jsConfigure(scalaJSViteConfigure) // Because of didJS now uses NPM libs
.configure(docConfigure)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,24 @@ import zio.stream._
import fmgp.did._
import fmgp.did.comm._
import fmgp.crypto.error._
import zio.http.ChannelEvent.Read
import zio.http._

type SocketID = String

case class MyChannel(id: SocketID, socketOutHub: Hub[String])
case class DIDSocketManager(
sockets: Map[SocketID, MyChannel] = Map.empty,
ids: Map[FROMTO, Seq[SocketID]] = Map.empty,
kids: Map[VerificationMethodReferenced, Seq[SocketID]] = Map.empty,
tapBy: Seq[SocketID] = Seq.empty,
) {

def link(from: VerificationMethodReferenced, socketID: SocketID): DIDSocketManager =
def link(vmr: VerificationMethodReferenced, socketID: SocketID): DIDSocketManager =
if (!sockets.keySet.contains(socketID)) this // if sockets is close
else
kids.get(from) match
kids.get(vmr) match
case Some(seq) if seq.contains(socketID) => this
case Some(seq) => this.copy(kids = kids + (from -> (seq :+ socketID))).link(from.fromto, socketID)
case None => this.copy(kids = kids + (from -> Seq(socketID))).link(from.fromto, socketID)
case Some(seq) => this.copy(kids = kids + (vmr -> (seq :+ socketID))).link(vmr.did.asFROMTO, socketID)
case None => this.copy(kids = kids + (vmr -> Seq(socketID))).link(vmr.did.asFROMTO, socketID)

def link(from: FROMTO, socketID: SocketID): DIDSocketManager =
if (!sockets.keySet.contains(socketID)) this // if sockets is close
Expand All @@ -35,19 +35,27 @@ case class DIDSocketManager(
case Some(seq) => this.copy(ids = ids + (from -> (seq :+ socketID)))
case None => this.copy(ids = ids + (from -> Seq(socketID)))

def tap(socketID: SocketID) = this.copy(tapBy = tapBy :+ socketID)

def tapSockets = sockets.filter(e => tapBy.contains(e._1)).map(_._2)

def registerSocket(myChannel: MyChannel) = this.copy(sockets = sockets + (myChannel.id -> myChannel))

def unregisterSocket(socketID: SocketID) = this.copy(
sockets = sockets.view.filterKeys(_ != socketID).toMap,
ids = ids.map { case (did, socketsID) => (did, socketsID.filter(_ != socketID)) }.filterNot(_._2.isEmpty),
kids = kids.map { case (kid, socketsID) => (kid, socketsID.filter(_ != socketID)) }.filterNot(_._2.isEmpty),
tapBy = tapBy.filter(_ != socketID)
)

def publish(to: TO, msg: String): ZIO[Any, Nothing, Seq[Unit]] = {
val socketIDs = this.ids.getOrElse(to.asFROMTO, Seq.empty)
val myChannels = socketIDs.flatMap(id => this.sockets.get(id))
ZIO.foreach(myChannels) { channel =>
channel.socketOutHub
.publish(msg)
.flatMap {
case true => ZIO.logDebug(s"Publish Message to SocketID:${channel.id}")
case false => ZIO.logWarning(s"Publish Message return false in SocketID:${channel.id}")
}
}
}

}

object DIDSocketManager {
Expand All @@ -60,25 +68,54 @@ object DIDSocketManager {

def make = Ref.make(DIDSocketManager())

def tapSocket(didSubject: DIDSubject, channel: WebSocketChannel, channelId: String) =
for {
socketManager <- ZIO.service[Ref[DIDSocketManager]]
hub <- Hub.bounded[String](outBoundSize)
myChannel = MyChannel(channelId, hub)
_ <- socketManager.update { _.registerSocket(myChannel).tap(socketID = channelId) }
sink = ZSink.foreach((value: String) => channel.send(Read(WebSocketFrame.text(value))))
_ <- ZIO.log(s"Tapping into channel")
_ <- ZStream.fromHub(myChannel.socketOutHub).run(sink) // TODO .fork does not work!!!
_ <- ZIO.log(s"Tap channel concluded")
} yield ()

def registerSocket(channel: WebSocketChannel, channelId: String) =
for {
socketManager <- ZIO.service[Ref[DIDSocketManager]]
hub <- Hub.bounded[String](outBoundSize)
myChannel = MyChannel(channelId, hub)
_ <- socketManager.update { _.registerSocket(myChannel) }
sink = ZSink.foreach((value: String) => channel.send(Read(WebSocketFrame.text(value))))
_ <- channel.receiveAll {
case ChannelEvent.ExceptionCaught(cause) => ZIO.log("ExceptionCaught(cause) ")
case ChannelEvent.Read(message) =>
message match
case frame: WebSocketFrame.Binary => ZIO.log("Binary(bytes)")
case frame: WebSocketFrame.Text =>
frame.isFinal match
case false => ZIO.logError("frame.isFinal is false! text: $frame")
case true =>
frame.text match
case send if (send.startsWith("send")) =>
send.split(" ", 3).toSeq match
case Seq("send", did, message) =>
ZIO.service[Ref[DIDSocketManager]].flatMap(_.get).flatMap(_.publish(TO(did), message))
case test => // TODO REMOVE
for {
socketManager2 <- ZIO
.service[Ref[DIDSocketManager]]
.flatMap(_.get)
.flatMap(dsm =>
ZIO.foreach(dsm.sockets)((k, v) =>
v.socketOutHub.publish(s"TEST: $test").map(e => (k, e))
)
)
} yield ()
case link if (link.startsWith("link")) =>
link.split(" ", 2).toSeq match
case Seq("link", did) =>
ZIO.service[Ref[DIDSocketManager]].flatMap(_.update(_.link(FROMTO(did), channelId)))
case text => // TODO REMOVE
ZIO.logWarning(s"Like from Socket fail: $text")
case "info" => ZIO.service[Ref[DIDSocketManager]].flatMap(_.get).debug
case text => ZIO.log(s"Text:${text}")
case WebSocketFrame.Close(status, reason) => ZIO.log("Close(status, reason)")
case frame: WebSocketFrame.Continuation => ZIO.log("Continuation(buffer)")
case WebSocketFrame.Ping => ZIO.log("Ping")
case WebSocketFrame.Pong => ZIO.log("Pong")
case ChannelEvent.UserEventTriggered(event) => ZIO.log("UserEventTriggered(event)")
case ChannelEvent.Registered => ZIO.log("Registered")
case ChannelEvent.Unregistered => ZIO.log("Unregistered")
}.fork
sink = ZSink.foreach((value: String) => channel.send(ChannelEvent.Read(WebSocketFrame.text(value))))
_ <- ZIO.log(s"Registering channel")
_ <- ZStream.fromHub(myChannel.socketOutHub).run(sink) // TODO .fork does not work!!!
_ <- ZIO.log(s"Channel concluded")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class MediatorMultiAgent(
for {
msg <- data.fromJson[EncryptedMessage] match
case Left(error) =>
ZIO.logError(s"Data is not a EncryptedMessage: $error")
ZIO.logError(s"Data is not a EncryptedMessage: '$error'")
*> ZIO.fail(FailToParse(error))
case Right(message) =>
ZIO.logDebug(
Expand All @@ -89,16 +89,16 @@ case class MediatorMultiAgent(
.logAnnotate("msgHash", msg.hashCode.toString) {
for {
_ <- ZIO.log(s"receiveMessage with hashCode: ${msg.hashCode}")
_ <- didSocketManager.get.flatMap { m =>
ZIO.foreach(msg.recipientsSubject)(subject => m.publish(subject.asTO, msg.toJson))
}
maybeSyncReplyMsg <-
if (!msg.recipientsSubject.contains(id))
ZIO.logError(s"This mediator '${id.string}' is not a recipient")
*> ZIO.none
else
for {
plaintextMessage <- decrypt(msg)
_ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!!
ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson))
}
_ <- mSocketID match
case None => ZIO.unit
case Some(socketID) =>
Expand Down Expand Up @@ -127,7 +127,7 @@ case class MediatorMultiAgent(
channel.receiveAll {
case UserEventTriggered(UserEvent.HandshakeComplete) =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
ZIO.log(s"HandshakeComplete $channelId") *>
ZIO.logDebug(s"HandshakeComplete $channelId") *>
DIDSocketManager.registerSocket(channel, channelId)
}
case UserEventTriggered(UserEvent.HandshakeTimeout) =>
Expand All @@ -144,10 +144,13 @@ case class MediatorMultiAgent(
}
case ChannelEvent.Read(WebSocketFrame.Text(text)) =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
DIDSocketManager
.newMessage(channel, text, channelId)
.flatMap { case (socketID, encryptedMessage) => receiveMessage(encryptedMessage, Some(socketID)) }
.mapError(ex => DidException(ex))
ZIO.log(s"GOT NEW Message in socket_ID $channelId TEXT: $text") *>
DIDSocketManager
.newMessage(channel, text, channelId)
.flatMap { case (socketID, encryptedMessage) => receiveMessage(encryptedMessage, Some(socketID)) }
.debug
.tapError(ex => ZIO.logError(s"Error: ${ex}"))
.mapError(ex => DidException(ex))
}
case ChannelEvent.Read(any) =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
Expand All @@ -163,45 +166,6 @@ case class MediatorMultiAgent(
.provideSomeEnvironment { (env) => env.add(env.get[MediatorMultiAgent].didSocketManager) }
}

def websocketListenerApp(
annotationMap: Seq[LogAnnotation]
): ZIO[MediatorMultiAgent & Operations & MessageDispatcher, Nothing, zio.http.Response] = {
import zio.http.ChannelEvent._
val SOCKET_ID = "SocketID"
Handler
.webSocket { channel => // WebSocketChannel = Channel[ChannelEvent[WebSocketFrame], ChannelEvent[WebSocketFrame]]
val channelId = scala.util.Random.nextLong().toString
channel.receiveAll {
case UserEventTriggered(UserEvent.HandshakeComplete) =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
ZIO.log(s"HandshakeComplete $channelId") *>
DIDSocketManager.tapSocket(id, channel, channelId)
}
case UserEventTriggered(UserEvent.HandshakeTimeout) =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
ZIO.logWarning(s"HandshakeTimeout $channelId")
}
case ChannelEvent.Registered =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
DIDSocketManager.registerSocket(channel, channelId)
}
case ChannelEvent.Unregistered =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
DIDSocketManager.unregisterSocket(channel, channelId)
}
case ChannelEvent.Read(any) =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
ZIO.logWarning(s"Ignored Message from '${channelId}'")
}
case ChannelEvent.ExceptionCaught(ex) =>
ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) {
ZIO.log(ex.getMessage())
}
}
}
.toResponse
.provideSomeEnvironment { (env) => env.add(env.get[MediatorMultiAgent].didSocketManager) }
}
}

object MediatorMultiAgent {
Expand All @@ -217,50 +181,14 @@ object MediatorMultiAgent {
// def didCommApp: HttpApp[Hub[String] & AgentByHost & Operations & MessageDispatcher] = Routes(
def didCommApp = Routes(
Method.GET / "ws" -> handler { (req: Request) =>
if (
req
// FIXME after https://github.com/zio/zio-http/issues/2416
// .header(Header.ContentType)
// .exists { h =>
// h.mediaType.mainType == MediaTypes.mainType &&
// (h.mediaType.subType == MediaTypes.SIGNED.subType || h.mediaType.subType == MediaTypes.ENCRYPTED.subType)
.headers
.get("content-type")
.exists { h => h == MediaTypes.SIGNED.typ || h == MediaTypes.ENCRYPTED.typ }
) {
(for {
agent <- AgentByHost.getAgentFor(req)
annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq)
ret <- agent
.createSocketApp(annotationMap)
.provideSomeEnvironment((env: ZEnvironment[Operations & MessageDispatcher]) => env.add(agent))
} yield (ret)
// } else ZIO.succeed(Response(status = Status.NotFound))
).orDie
} else {
(
for {
agent <- AgentByHost.getAgentFor(req)
data <- req.body.asString
ret <- agent
.receiveMessage(data, None)
.provideSomeEnvironment((env: ZEnvironment[Operations & MessageDispatcher]) => env.add(agent))
.mapError(fail => DidException(fail))
} yield Response
.text(s"The content-type must be ${MediaTypes.SIGNED.typ} or ${MediaTypes.ENCRYPTED.typ}")
// .copy(status = Status.BadRequest) but ok for now
).orDie
}
},
Method.GET / "tap" / string("host") -> handler { (host: String, req: Request) =>
for {
agent <- AgentByHost.getAgentFor(Host(host))
agent <- AgentByHost.getAgentFor(req).debug
annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq)
ret <- agent
.websocketListenerApp(annotationMap)
.createSocketApp(annotationMap)
.provideSomeEnvironment((env: ZEnvironment[Operations & MessageDispatcher]) => env.add(agent))
} yield (ret)
}.orDie,
},
Method.POST / trailing -> handler { (req: Request) =>
if (
req
Expand Down
29 changes: 12 additions & 17 deletions demo/jvm/src/main/scala/fmgp/did/demo/AgentByHost.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ object MyHeaders { // extends HeaderNames {

object AgentByHost {

def getAgentFor(req: Request) = ZIO
.serviceWithZIO[AgentByHost](_.agentFromRequest(req))
.mapError(ex => DidException(ex))
def getAgentFor(req: Request) = ZIO.serviceWithZIO[AgentByHost](_.agentFromRequest(req))

def getAgentFor(host: Host) = ZIO
.serviceWithZIO[AgentByHost](_.agentFromHost(host))
.tapError(ex => ZIO.logError(ex.toString()))
.mapError(ex => DidException(ex))

def provideAgentFor[R, E <: Exception, A](req: Request, job: ZIO[R & MediatorMultiAgent, E, A]) =
for {
agent <- ZIO
.serviceWithZIO[AgentByHost](_.agentFromRequest(req))
.mapError(ex => DidException(ex))
agent <- ZIO.serviceWithZIO[AgentByHost](_.agentFromRequest(req))
ret <- job.provideSomeEnvironment((env: ZEnvironment[R]) => env.add(agent))
} yield ()

Expand All @@ -53,28 +50,26 @@ object AgentByHost {
charlie <- MediatorMultiAgent.make(AgentProvider.charlie)
local <- MediatorMultiAgent.make(AgentProvider.local)
} yield AgentByHost(
Map(
defaultAgent = local,
agents = Map(
Host.alice -> alice,
Host.bob -> bob,
Host.charlie -> charlie,
Host("localhost:8080") -> local,
)
)
)
}

case class AgentByHost(agents: Map[Host, MediatorMultiAgent]) {
case class AgentByHost(defaultAgent: MediatorMultiAgent, agents: Map[Host, MediatorMultiAgent]) {

def agentFromRequest(req: Request): zio.ZIO[Any, NoAgent, MediatorMultiAgent] =
AgentByHost.hostFromRequest(req) match
case None => ZIO.fail(NoAgent(s"Unknown host"))
case Some(host) =>
agents.get(host) match
case None => ZIO.fail(NoAgent(s"No Agent config for $host"))
case Some(agent) => ZIO.succeed(agent)
def agentFromRequest(req: Request): zio.ZIO[Any, Nothing, MediatorMultiAgent] =
AgentByHost
.hostFromRequest(req)
.flatMap { host => agents.get(host).map(agent => ZIO.succeed(agent)) }
.getOrElse(ZIO.succeed(defaultAgent))

def agentFromHost(host: Host): zio.ZIO[Any, NoAgent, MediatorMultiAgent] =
agents.get(host) match
case None => ZIO.fail(NoAgent(s"No Agent config for $host"))
case None => ZIO.fail(NoAgent(s"No Agent config for '$host'"))
case Some(agent) => ZIO.succeed(agent)
}
Loading