From 65ca8cde3b786bbe2df54d69e4469724814ec595 Mon Sep 17 00:00:00 2001 From: FabioPinheiro Date: Fri, 27 Oct 2023 23:04:00 +0100 Subject: [PATCH] New type Transport and replace WebsocketJS --- build.sbt | 6 +- .../did/comm/mediator/DIDSocketManager.scala | 85 ++++++++++----- .../comm/mediator/MediatorMultiAgent.scala | 102 +++--------------- .../scala/fmgp/did/demo/AgentByHost.scala | 29 +++-- .../main/scala/fmgp/did/demo/AppServer.scala | 31 +----- .../shared/src/main/scala/fmgp/did/Host.scala | 1 + .../main/scala/fmgp/util/TransportWSImp.scala | 101 +++++++++++++++++ .../scala/fmgp/util/DIDSocketManager.scala | 11 +- .../src/main/scala/fmgp/util/Transport.scala | 16 +++ .../src/main/scala/fmgp/util/Websocket.scala | 46 ++++++++ .../scala/fmgp/did/VerificationMethod.scala | 1 - .../main/scala/fmgp/did/comm/Message.scala | 1 + .../main/scala/fmgp/did/comm/Operations.scala | 22 ++-- .../scala/fmgp/did/comm/OperationsImp.scala | 2 +- vite.config.js | 2 +- webapp/src/main/scala/fmgp/Utils.scala | 26 +++++ webapp/src/main/scala/fmgp/Websocket.scala | 93 ---------------- webapp/src/main/scala/fmgp/WebsocketJS.scala | 54 ---------- .../main/scala/fmgp/webapp/DecryptTool.scala | 3 +- .../main/scala/fmgp/webapp/EncryptTool.scala | 6 +- .../src/main/scala/fmgp/webapp/Global.scala | 28 ++++- webapp/src/main/scala/fmgp/webapp/Home.scala | 7 +- .../scala/fmgp/webapp/TapIntoStreamTool.scala | 79 +++++++++++--- 23 files changed, 399 insertions(+), 353 deletions(-) create mode 100644 did-extra/js/src/main/scala/fmgp/util/TransportWSImp.scala create mode 100644 did-extra/shared/src/main/scala/fmgp/util/Transport.scala create mode 100644 did-extra/shared/src/main/scala/fmgp/util/Websocket.scala delete mode 100644 webapp/src/main/scala/fmgp/Websocket.scala delete mode 100644 webapp/src/main/scala/fmgp/WebsocketJS.scala diff --git a/build.sbt b/build.sbt index cedab7ee..b414372d 100644 --- a/build.sbt +++ b/build.sbt @@ -324,6 +324,7 @@ lazy val didExperiments = crossProject(JSPlatform, JVMPlatform) .jsConfigure(scalaJSViteConfigure) // Because of didJS now uses NPM libs .configure(docConfigure) +//TODO Rename did-extra to did-framework lazy val didExtra = crossProject(JSPlatform, JVMPlatform) .in(file("did-extra")) .configure(notYetPublishedConfigure) // FIXME @@ -332,9 +333,8 @@ lazy val didExtra = crossProject(JSPlatform, JVMPlatform) libraryDependencies += D.zioMunitTest.value, ) .dependsOn(did % "compile;test->test") - .jvmSettings( - libraryDependencies += D.ziohttp.value, - ) + .jvmSettings(libraryDependencies += D.ziohttp.value) + .jsSettings(libraryDependencies += D.dom.value) .jsConfigure(scalaJSViteConfigure) // Because of didJS now uses NPM libs .configure(docConfigure) diff --git a/demo/jvm/src/main/scala/fmgp/did/comm/mediator/DIDSocketManager.scala b/demo/jvm/src/main/scala/fmgp/did/comm/mediator/DIDSocketManager.scala index 8c8a0bb4..e28370f5 100644 --- a/demo/jvm/src/main/scala/fmgp/did/comm/mediator/DIDSocketManager.scala +++ b/demo/jvm/src/main/scala/fmgp/did/comm/mediator/DIDSocketManager.scala @@ -8,24 +8,24 @@ import zio.stream._ import fmgp.did._ import fmgp.did.comm._ import fmgp.crypto.error._ -import zio.http.ChannelEvent.Read +import zio.http._ type SocketID = String + case class MyChannel(id: SocketID, socketOutHub: Hub[String]) case class DIDSocketManager( sockets: Map[SocketID, MyChannel] = Map.empty, ids: Map[FROMTO, Seq[SocketID]] = Map.empty, kids: Map[VerificationMethodReferenced, Seq[SocketID]] = Map.empty, - tapBy: Seq[SocketID] = Seq.empty, ) { - def link(from: VerificationMethodReferenced, socketID: SocketID): DIDSocketManager = + def link(vmr: VerificationMethodReferenced, socketID: SocketID): DIDSocketManager = if (!sockets.keySet.contains(socketID)) this // if sockets is close else - kids.get(from) match + kids.get(vmr) match case Some(seq) if seq.contains(socketID) => this - case Some(seq) => this.copy(kids = kids + (from -> (seq :+ socketID))).link(from.fromto, socketID) - case None => this.copy(kids = kids + (from -> Seq(socketID))).link(from.fromto, socketID) + case Some(seq) => this.copy(kids = kids + (vmr -> (seq :+ socketID))).link(vmr.did.asFROMTO, socketID) + case None => this.copy(kids = kids + (vmr -> Seq(socketID))).link(vmr.did.asFROMTO, socketID) def link(from: FROMTO, socketID: SocketID): DIDSocketManager = if (!sockets.keySet.contains(socketID)) this // if sockets is close @@ -35,19 +35,27 @@ case class DIDSocketManager( case Some(seq) => this.copy(ids = ids + (from -> (seq :+ socketID))) case None => this.copy(ids = ids + (from -> Seq(socketID))) - def tap(socketID: SocketID) = this.copy(tapBy = tapBy :+ socketID) - - def tapSockets = sockets.filter(e => tapBy.contains(e._1)).map(_._2) - def registerSocket(myChannel: MyChannel) = this.copy(sockets = sockets + (myChannel.id -> myChannel)) def unregisterSocket(socketID: SocketID) = this.copy( sockets = sockets.view.filterKeys(_ != socketID).toMap, ids = ids.map { case (did, socketsID) => (did, socketsID.filter(_ != socketID)) }.filterNot(_._2.isEmpty), kids = kids.map { case (kid, socketsID) => (kid, socketsID.filter(_ != socketID)) }.filterNot(_._2.isEmpty), - tapBy = tapBy.filter(_ != socketID) ) + def publish(to: TO, msg: String): ZIO[Any, Nothing, Seq[Unit]] = { + val socketIDs = this.ids.getOrElse(to.asFROMTO, Seq.empty) + val myChannels = socketIDs.flatMap(id => this.sockets.get(id)) + ZIO.foreach(myChannels) { channel => + channel.socketOutHub + .publish(msg) + .flatMap { + case true => ZIO.logDebug(s"Publish Message to SocketID:${channel.id}") + case false => ZIO.logWarning(s"Publish Message return false in SocketID:${channel.id}") + } + } + } + } object DIDSocketManager { @@ -60,25 +68,54 @@ object DIDSocketManager { def make = Ref.make(DIDSocketManager()) - def tapSocket(didSubject: DIDSubject, channel: WebSocketChannel, channelId: String) = - for { - socketManager <- ZIO.service[Ref[DIDSocketManager]] - hub <- Hub.bounded[String](outBoundSize) - myChannel = MyChannel(channelId, hub) - _ <- socketManager.update { _.registerSocket(myChannel).tap(socketID = channelId) } - sink = ZSink.foreach((value: String) => channel.send(Read(WebSocketFrame.text(value)))) - _ <- ZIO.log(s"Tapping into channel") - _ <- ZStream.fromHub(myChannel.socketOutHub).run(sink) // TODO .fork does not work!!! - _ <- ZIO.log(s"Tap channel concluded") - } yield () - def registerSocket(channel: WebSocketChannel, channelId: String) = for { socketManager <- ZIO.service[Ref[DIDSocketManager]] hub <- Hub.bounded[String](outBoundSize) myChannel = MyChannel(channelId, hub) _ <- socketManager.update { _.registerSocket(myChannel) } - sink = ZSink.foreach((value: String) => channel.send(Read(WebSocketFrame.text(value)))) + _ <- channel.receiveAll { + case ChannelEvent.ExceptionCaught(cause) => ZIO.log("ExceptionCaught(cause) ") + case ChannelEvent.Read(message) => + message match + case frame: WebSocketFrame.Binary => ZIO.log("Binary(bytes)") + case frame: WebSocketFrame.Text => + frame.isFinal match + case false => ZIO.logError("frame.isFinal is false! text: $frame") + case true => + frame.text match + case send if (send.startsWith("send")) => + send.split(" ", 3).toSeq match + case Seq("send", did, message) => + ZIO.service[Ref[DIDSocketManager]].flatMap(_.get).flatMap(_.publish(TO(did), message)) + case test => // TODO REMOVE + for { + socketManager2 <- ZIO + .service[Ref[DIDSocketManager]] + .flatMap(_.get) + .flatMap(dsm => + ZIO.foreach(dsm.sockets)((k, v) => + v.socketOutHub.publish(s"TEST: $test").map(e => (k, e)) + ) + ) + } yield () + case link if (link.startsWith("link")) => + link.split(" ", 2).toSeq match + case Seq("link", did) => + ZIO.service[Ref[DIDSocketManager]].flatMap(_.update(_.link(FROMTO(did), channelId))) + case text => // TODO REMOVE + ZIO.logWarning(s"Like from Socket fail: $text") + case "info" => ZIO.service[Ref[DIDSocketManager]].flatMap(_.get).debug + case text => ZIO.log(s"Text:${text}") + case WebSocketFrame.Close(status, reason) => ZIO.log("Close(status, reason)") + case frame: WebSocketFrame.Continuation => ZIO.log("Continuation(buffer)") + case WebSocketFrame.Ping => ZIO.log("Ping") + case WebSocketFrame.Pong => ZIO.log("Pong") + case ChannelEvent.UserEventTriggered(event) => ZIO.log("UserEventTriggered(event)") + case ChannelEvent.Registered => ZIO.log("Registered") + case ChannelEvent.Unregistered => ZIO.log("Unregistered") + }.fork + sink = ZSink.foreach((value: String) => channel.send(ChannelEvent.Read(WebSocketFrame.text(value)))) _ <- ZIO.log(s"Registering channel") _ <- ZStream.fromHub(myChannel.socketOutHub).run(sink) // TODO .fork does not work!!! _ <- ZIO.log(s"Channel concluded") diff --git a/demo/jvm/src/main/scala/fmgp/did/comm/mediator/MediatorMultiAgent.scala b/demo/jvm/src/main/scala/fmgp/did/comm/mediator/MediatorMultiAgent.scala index 5547171e..569c1b59 100644 --- a/demo/jvm/src/main/scala/fmgp/did/comm/mediator/MediatorMultiAgent.scala +++ b/demo/jvm/src/main/scala/fmgp/did/comm/mediator/MediatorMultiAgent.scala @@ -71,7 +71,7 @@ case class MediatorMultiAgent( for { msg <- data.fromJson[EncryptedMessage] match case Left(error) => - ZIO.logError(s"Data is not a EncryptedMessage: $error") + ZIO.logError(s"Data is not a EncryptedMessage: '$error'") *> ZIO.fail(FailToParse(error)) case Right(message) => ZIO.logDebug( @@ -89,6 +89,9 @@ case class MediatorMultiAgent( .logAnnotate("msgHash", msg.hashCode.toString) { for { _ <- ZIO.log(s"receiveMessage with hashCode: ${msg.hashCode}") + _ <- didSocketManager.get.flatMap { m => + ZIO.foreach(msg.recipientsSubject)(subject => m.publish(subject.asTO, msg.toJson)) + } maybeSyncReplyMsg <- if (!msg.recipientsSubject.contains(id)) ZIO.logError(s"This mediator '${id.string}' is not a recipient") @@ -96,9 +99,6 @@ case class MediatorMultiAgent( else for { plaintextMessage <- decrypt(msg) - _ <- didSocketManager.get.flatMap { m => // TODO HACK REMOVE !!!!!!!!!!!!!!!!!!!!!!!! - ZIO.foreach(m.tapSockets)(_.socketOutHub.publish(TapMessage(msg, plaintextMessage).toJson)) - } _ <- mSocketID match case None => ZIO.unit case Some(socketID) => @@ -127,7 +127,7 @@ case class MediatorMultiAgent( channel.receiveAll { case UserEventTriggered(UserEvent.HandshakeComplete) => ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - ZIO.log(s"HandshakeComplete $channelId") *> + ZIO.logDebug(s"HandshakeComplete $channelId") *> DIDSocketManager.registerSocket(channel, channelId) } case UserEventTriggered(UserEvent.HandshakeTimeout) => @@ -144,10 +144,13 @@ case class MediatorMultiAgent( } case ChannelEvent.Read(WebSocketFrame.Text(text)) => ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - DIDSocketManager - .newMessage(channel, text, channelId) - .flatMap { case (socketID, encryptedMessage) => receiveMessage(encryptedMessage, Some(socketID)) } - .mapError(ex => DidException(ex)) + ZIO.log(s"GOT NEW Message in socket_ID $channelId TEXT: $text") *> + DIDSocketManager + .newMessage(channel, text, channelId) + .flatMap { case (socketID, encryptedMessage) => receiveMessage(encryptedMessage, Some(socketID)) } + .debug + .tapError(ex => ZIO.logError(s"Error: ${ex}")) + .mapError(ex => DidException(ex)) } case ChannelEvent.Read(any) => ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { @@ -163,45 +166,6 @@ case class MediatorMultiAgent( .provideSomeEnvironment { (env) => env.add(env.get[MediatorMultiAgent].didSocketManager) } } - def websocketListenerApp( - annotationMap: Seq[LogAnnotation] - ): ZIO[MediatorMultiAgent & Operations & MessageDispatcher, Nothing, zio.http.Response] = { - import zio.http.ChannelEvent._ - val SOCKET_ID = "SocketID" - Handler - .webSocket { channel => // WebSocketChannel = Channel[ChannelEvent[WebSocketFrame], ChannelEvent[WebSocketFrame]] - val channelId = scala.util.Random.nextLong().toString - channel.receiveAll { - case UserEventTriggered(UserEvent.HandshakeComplete) => - ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - ZIO.log(s"HandshakeComplete $channelId") *> - DIDSocketManager.tapSocket(id, channel, channelId) - } - case UserEventTriggered(UserEvent.HandshakeTimeout) => - ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - ZIO.logWarning(s"HandshakeTimeout $channelId") - } - case ChannelEvent.Registered => - ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - DIDSocketManager.registerSocket(channel, channelId) - } - case ChannelEvent.Unregistered => - ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - DIDSocketManager.unregisterSocket(channel, channelId) - } - case ChannelEvent.Read(any) => - ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - ZIO.logWarning(s"Ignored Message from '${channelId}'") - } - case ChannelEvent.ExceptionCaught(ex) => - ZIO.logAnnotate(LogAnnotation(SOCKET_ID, channelId), annotationMap: _*) { - ZIO.log(ex.getMessage()) - } - } - } - .toResponse - .provideSomeEnvironment { (env) => env.add(env.get[MediatorMultiAgent].didSocketManager) } - } } object MediatorMultiAgent { @@ -217,50 +181,14 @@ object MediatorMultiAgent { // def didCommApp: HttpApp[Hub[String] & AgentByHost & Operations & MessageDispatcher] = Routes( def didCommApp = Routes( Method.GET / "ws" -> handler { (req: Request) => - if ( - req - // FIXME after https://github.com/zio/zio-http/issues/2416 - // .header(Header.ContentType) - // .exists { h => - // h.mediaType.mainType == MediaTypes.mainType && - // (h.mediaType.subType == MediaTypes.SIGNED.subType || h.mediaType.subType == MediaTypes.ENCRYPTED.subType) - .headers - .get("content-type") - .exists { h => h == MediaTypes.SIGNED.typ || h == MediaTypes.ENCRYPTED.typ } - ) { - (for { - agent <- AgentByHost.getAgentFor(req) - annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq) - ret <- agent - .createSocketApp(annotationMap) - .provideSomeEnvironment((env: ZEnvironment[Operations & MessageDispatcher]) => env.add(agent)) - } yield (ret) - // } else ZIO.succeed(Response(status = Status.NotFound)) - ).orDie - } else { - ( - for { - agent <- AgentByHost.getAgentFor(req) - data <- req.body.asString - ret <- agent - .receiveMessage(data, None) - .provideSomeEnvironment((env: ZEnvironment[Operations & MessageDispatcher]) => env.add(agent)) - .mapError(fail => DidException(fail)) - } yield Response - .text(s"The content-type must be ${MediaTypes.SIGNED.typ} or ${MediaTypes.ENCRYPTED.typ}") - // .copy(status = Status.BadRequest) but ok for now - ).orDie - } - }, - Method.GET / "tap" / string("host") -> handler { (host: String, req: Request) => for { - agent <- AgentByHost.getAgentFor(Host(host)) + agent <- AgentByHost.getAgentFor(req).debug annotationMap <- ZIO.logAnnotations.map(_.map(e => LogAnnotation(e._1, e._2)).toSeq) ret <- agent - .websocketListenerApp(annotationMap) + .createSocketApp(annotationMap) .provideSomeEnvironment((env: ZEnvironment[Operations & MessageDispatcher]) => env.add(agent)) } yield (ret) - }.orDie, + }, Method.POST / trailing -> handler { (req: Request) => if ( req diff --git a/demo/jvm/src/main/scala/fmgp/did/demo/AgentByHost.scala b/demo/jvm/src/main/scala/fmgp/did/demo/AgentByHost.scala index 42034fa8..f9e98437 100644 --- a/demo/jvm/src/main/scala/fmgp/did/demo/AgentByHost.scala +++ b/demo/jvm/src/main/scala/fmgp/did/demo/AgentByHost.scala @@ -17,19 +17,16 @@ object MyHeaders { // extends HeaderNames { object AgentByHost { - def getAgentFor(req: Request) = ZIO - .serviceWithZIO[AgentByHost](_.agentFromRequest(req)) - .mapError(ex => DidException(ex)) + def getAgentFor(req: Request) = ZIO.serviceWithZIO[AgentByHost](_.agentFromRequest(req)) def getAgentFor(host: Host) = ZIO .serviceWithZIO[AgentByHost](_.agentFromHost(host)) + .tapError(ex => ZIO.logError(ex.toString())) .mapError(ex => DidException(ex)) def provideAgentFor[R, E <: Exception, A](req: Request, job: ZIO[R & MediatorMultiAgent, E, A]) = for { - agent <- ZIO - .serviceWithZIO[AgentByHost](_.agentFromRequest(req)) - .mapError(ex => DidException(ex)) + agent <- ZIO.serviceWithZIO[AgentByHost](_.agentFromRequest(req)) ret <- job.provideSomeEnvironment((env: ZEnvironment[R]) => env.add(agent)) } yield () @@ -53,28 +50,26 @@ object AgentByHost { charlie <- MediatorMultiAgent.make(AgentProvider.charlie) local <- MediatorMultiAgent.make(AgentProvider.local) } yield AgentByHost( - Map( + defaultAgent = local, + agents = Map( Host.alice -> alice, Host.bob -> bob, Host.charlie -> charlie, - Host("localhost:8080") -> local, ) ) ) } -case class AgentByHost(agents: Map[Host, MediatorMultiAgent]) { +case class AgentByHost(defaultAgent: MediatorMultiAgent, agents: Map[Host, MediatorMultiAgent]) { - def agentFromRequest(req: Request): zio.ZIO[Any, NoAgent, MediatorMultiAgent] = - AgentByHost.hostFromRequest(req) match - case None => ZIO.fail(NoAgent(s"Unknown host")) - case Some(host) => - agents.get(host) match - case None => ZIO.fail(NoAgent(s"No Agent config for $host")) - case Some(agent) => ZIO.succeed(agent) + def agentFromRequest(req: Request): zio.ZIO[Any, Nothing, MediatorMultiAgent] = + AgentByHost + .hostFromRequest(req) + .flatMap { host => agents.get(host).map(agent => ZIO.succeed(agent)) } + .getOrElse(ZIO.succeed(defaultAgent)) def agentFromHost(host: Host): zio.ZIO[Any, NoAgent, MediatorMultiAgent] = agents.get(host) match - case None => ZIO.fail(NoAgent(s"No Agent config for $host")) + case None => ZIO.fail(NoAgent(s"No Agent config for '$host'")) case Some(agent) => ZIO.succeed(agent) } diff --git a/demo/jvm/src/main/scala/fmgp/did/demo/AppServer.scala b/demo/jvm/src/main/scala/fmgp/did/demo/AppServer.scala index dd279792..37f7b481 100644 --- a/demo/jvm/src/main/scala/fmgp/did/demo/AppServer.scala +++ b/demo/jvm/src/main/scala/fmgp/did/demo/AppServer.scala @@ -34,7 +34,7 @@ import zio.http.MediaTypes * * curl 'http://localhost:8080/db' -H "host: alice.did.fmgp.app" * - * wscat -c ws://localhost:8080 --host "alice.did.fmgp.app" -H 'content-type: application/didcomm-encrypted+json' + * wscat -c ws://localhost:8080/ws * * curl -X POST localhost:8080 -H "host: alice.did.fmgp.app" -H 'content-type: application/didcomm-encrypted+json' -d * '{}' @@ -91,35 +91,6 @@ object AppServer extends ZIOAppDefault { ) } yield (ret) }, - Method.GET / "socket" -> handler { (req: Request) => - for { - _ <- ZIO.log("socket") - agent <- AgentByHost.getAgentFor(req) - sm <- agent.didSocketManager.get - ret <- ZIO.succeed(Response.text(sm.toJsonPretty)) - } yield (ret) - }, - Method.POST / "socket" / string("id") -> handler { (id: String, req: Request) => - for { - hub <- ZIO.service[Hub[String]] - agent <- AgentByHost.getAgentFor(req) - sm <- agent.didSocketManager.get - ret <- sm.ids - .get(FROMTO(id)) - .toSeq - .flatMap { socketsID => - socketsID.flatMap(id => sm.sockets.get(id).map(e => (id, e))).toSeq - } match { - case Seq() => - req.body.asString.flatMap(e => hub.publish(s"socket missing for $id")) - *> ZIO.succeed(Response.text(s"socket missing")) - case seq => - ZIO.foreach(seq) { (socketID, channel) => - req.body.asString.flatMap(e => channel.socketOutHub.publish(e)) - } *> ZIO.succeed(Response.text(s"message sended")) - } - } yield (ret) - }, Method.POST / "ops" -> handler { (req: Request) => req.body.asString .tap(e => ZIO.log("ops")) diff --git a/did-example/shared/src/main/scala/fmgp/did/Host.scala b/did-example/shared/src/main/scala/fmgp/did/Host.scala index e8f8032f..67d7114d 100644 --- a/did-example/shared/src/main/scala/fmgp/did/Host.scala +++ b/did-example/shared/src/main/scala/fmgp/did/Host.scala @@ -17,4 +17,5 @@ object Host { val alice = Host("alice.did.fmgp.app") val bob = Host("bob.did.fmgp.app") val charlie = Host("charlie.did.fmgp.app") + val default = Host("localhost") } diff --git a/did-extra/js/src/main/scala/fmgp/util/TransportWSImp.scala b/did-extra/js/src/main/scala/fmgp/util/TransportWSImp.scala new file mode 100644 index 00000000..9f61f185 --- /dev/null +++ b/did-extra/js/src/main/scala/fmgp/util/TransportWSImp.scala @@ -0,0 +1,101 @@ +package fmgp.util + +import scala.scalajs.js +import org.scalajs.dom +import zio._ +import zio.json._ +import fmgp.did.comm._ +import zio.stream._ + +type OutErr = Nothing +type InErr = Nothing + +/** this API is still a WIP + * + * The Auto reconnect feature was remove. + */ +class TransportWSImp[MSG]( + private val outboundBuf: Queue[MSG], + private val inboundBuf: Hub[MSG], + /*private val*/ jsWS: dom.WebSocket, +) extends Transport[Any, MSG] { + + def outbound: ZSink[Any, OutErr, MSG, Nothing, Unit] = ZSink.fromQueue(outboundBuf) + def inbound: ZStream[Any, InErr, MSG] = ZStream.fromHub(inboundBuf) + + def send(message: MSG): zio.UIO[Boolean] = + ZIO.log(s"send $message") *> + outboundBuf.offer(message) + def subscribe: ZIO[Scope, Nothing, Dequeue[MSG]] = inboundBuf.subscribe + def recive[R, E](process: (MSG) => ZIO[R, E, Unit]) = inbound.runForeach(process) + +} + +object TransportWSImp { + type MSG = String + + def wsUrlFromWindowLocation = org.scalajs.dom.window.location.origin.replaceFirst("http", "ws") + "/ws" + + def layer: ZLayer[Any, Nothing, TransportWSImp[MSG]] = ZLayer.fromZIO(make()) + + def makeWSProgram: Websocket = new Websocket { + var state: Websocket.State = Websocket.State.CONNECTING + def onMessage(message: String): UIO[Unit] = ZIO.logDebug(s"onMessage: $message") + def onStateChange(s: Websocket.State): UIO[Unit] = ZIO.succeed({ state = s }) + } + + def make( + wsUrl: String = wsUrlFromWindowLocation, // "ws://localhost:8080/ws", + boundSize: Int = 10, + wsProgram: Websocket = makeWSProgram, + ): ZIO[Any, Nothing, TransportWSImp[MSG]] = for { + outbound <- Queue.bounded[MSG](boundSize) + inbound <- Hub.bounded[MSG](boundSize) + + // JS WebSocket bindings onOpen/onClose/onMessage/onError + tmpWS = new dom.WebSocket(wsUrl) + + transportWS = new TransportWSImp[MSG](outbound, inbound, tmpWS) + _ <- ZIO.logDebug("transportWS.bindings") + _ <- ZIO.unit.delay(1.second).debug + streamSendMessages <- ZStream.fromQueue(outbound).runForeach(data => ZIO.succeed(tmpWS.send(data))).fork + streamOnMessage <- ZStream + .async[Any, Nothing, Unit] { callback => + tmpWS.onopen = { (ev: dom.Event) => + callback( + wsProgram.onStateChange(Websocket.State.OPEN) *> + wsProgram.onOpen(ev.`type`) *> + ZIO.succeed(Chunk()) + ) + } + tmpWS.onmessage = { (ev: dom.MessageEvent) => + callback { + val data = ev.data.toString + wsProgram.onMessage(data) *> inbound.offer(data) *> ZIO.succeed(Chunk()) + } + } + tmpWS.onerror = { (ev: dom.Event) => + val message = ev + .asInstanceOf[js.Dynamic] + .message + .asInstanceOf[js.UndefOr[String]] + .fold("")("Error: " + _) + callback( + wsProgram.onStateChange(Websocket.State.CLOSED) *> + wsProgram.onError(ev.`type`, message) *> + ZIO.succeed(Chunk()) + ) + } + tmpWS.onclose = { (ev: dom.CloseEvent) => + callback( + wsProgram.onStateChange(Websocket.State.CLOSED) *> + wsProgram.onClose(ev.reason) *> + ZIO.succeed(Chunk()) + ) + } + } + .runDrain + .fork + _ <- ZIO.log("Make TransportWS created (and bindings done)") + } yield transportWS +} diff --git a/did-extra/jvm/src/main/scala/fmgp/util/DIDSocketManager.scala b/did-extra/jvm/src/main/scala/fmgp/util/DIDSocketManager.scala index a7e2e421..f5f94be5 100644 --- a/did-extra/jvm/src/main/scala/fmgp/util/DIDSocketManager.scala +++ b/did-extra/jvm/src/main/scala/fmgp/util/DIDSocketManager.scala @@ -10,6 +10,8 @@ import fmgp.did._ import fmgp.did.comm._ import fmgp.crypto.error._ +//FIXME REMOVE!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +@deprecated("this class will be removed and also is duplicated") case class DIDSocketManager( sockets: Map[SocketID, MyChannel] = Map.empty, ids: Map[FROMTO, Seq[SocketID]] = Map.empty, @@ -17,13 +19,13 @@ case class DIDSocketManager( tapBy: Seq[SocketID] = Seq.empty, ) { - def link(from: VerificationMethodReferenced, socketID: SocketID): DIDSocketManager = + def link(vmr: VerificationMethodReferenced, socketID: SocketID): DIDSocketManager = if (!sockets.keySet.contains(socketID)) this // if sockets is close else - kids.get(from) match + kids.get(vmr) match case Some(seq) if seq.contains(socketID) => this - case Some(seq) => this.copy(kids = kids + (from -> (seq :+ socketID))).link(from.fromto, socketID) - case None => this.copy(kids = kids + (from -> Seq(socketID))).link(from.fromto, socketID) + case Some(seq) => this.copy(kids = kids + (vmr -> (seq :+ socketID))).link(vmr.did.asFROMTO, socketID) + case None => this.copy(kids = kids + (vmr -> Seq(socketID))).link(vmr.did.asFROMTO, socketID) def link(from: FROMTO, socketID: SocketID): DIDSocketManager = if (!sockets.keySet.contains(socketID)) this // if sockets is close @@ -48,6 +50,7 @@ case class DIDSocketManager( } +@deprecated("this class will be removed and also is duplicated") object DIDSocketManager { def inBoundSize = 5 def outBoundSize = 3 diff --git a/did-extra/shared/src/main/scala/fmgp/util/Transport.scala b/did-extra/shared/src/main/scala/fmgp/util/Transport.scala new file mode 100644 index 00000000..71ec6cef --- /dev/null +++ b/did-extra/shared/src/main/scala/fmgp/util/Transport.scala @@ -0,0 +1,16 @@ +package fmgp.util + +import zio._ +import zio.stream._ +import fmgp.did.comm._ + +type TransportDIDComm[R, MSG] = Transport[R, Message] + +/** The goal is to make this DID Comm library Transport-agnostic */ +trait Transport[R, MSG] { + type OutErr = Nothing + type InErr = Nothing + + def outbound: ZSink[R, OutErr, MSG, Nothing, Unit] + def inbound: ZStream[R, InErr, MSG] +} diff --git a/did-extra/shared/src/main/scala/fmgp/util/Websocket.scala b/did-extra/shared/src/main/scala/fmgp/util/Websocket.scala new file mode 100644 index 00000000..68d1d612 --- /dev/null +++ b/did-extra/shared/src/main/scala/fmgp/util/Websocket.scala @@ -0,0 +1,46 @@ +package fmgp.util + +import zio._ + +trait Websocket { + def onOpen(evType: String): UIO[Unit] = Console.printLine(s"WS Connected '$evType'").orDie + def onClose(reason: String): UIO[Unit] = Console.printLine(s"WS Closed because '${reason}'").orDie + def onMessage(message: String): UIO[Unit] + def onError(evType: String, errorMessage: String): UIO[Unit] = + Console.printLine(s"WS Error (type:$evType) occurred! " + errorMessage).orDie + + // Extra https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState + def onStateChange(state: Websocket.State): UIO[Unit] + + // /** Transmits data to the server over the WebSocket connection. */ + // def send(data: String): UIO[Unit] +} + +object Websocket { + type State = State.Value + + /** @see https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/readyState */ + object State extends Enumeration { + + /** Socket has been created. The connection is not yet open. */ + val CONNECTING = Value(0) + + /** The connection is open and ready to communicate. */ + val OPEN = Value(1) + + /** The connection is in the process of closing. */ + val CLOSING = Value(2) + + /** The connection is closed or couldn't be opened. */ + val CLOSED = Value(3) + } + + // Accessor Methods Inside the Companion Object + def onOpen(evType: String): URIO[Websocket, Unit] = ZIO.serviceWithZIO(_.onOpen(evType)) + def onClose(reason: String): URIO[Websocket, Unit] = ZIO.serviceWithZIO(_.onClose(reason)) + def onMessage(message: String): URIO[Websocket, Unit] = ZIO.serviceWithZIO(_.onMessage(message)) + def onError(evType: String, message: String): URIO[Websocket, Unit] = + ZIO.serviceWithZIO(_.onError(evType: String, message: String)) + def onStateChange(newState: Websocket.State): URIO[Websocket, Unit] = ZIO.serviceWithZIO(_.onStateChange(newState)) + // def send(data: String): URIO[Websocket, Unit] = ZIO.serviceWithZIO(_.send(data)) +} diff --git a/did/shared/src/main/scala/fmgp/did/VerificationMethod.scala b/did/shared/src/main/scala/fmgp/did/VerificationMethod.scala index db1bf627..5635c230 100644 --- a/did/shared/src/main/scala/fmgp/did/VerificationMethod.scala +++ b/did/shared/src/main/scala/fmgp/did/VerificationMethod.scala @@ -48,7 +48,6 @@ object VerificationMethodReferencedWithKey { case class VerificationMethodReferenced(value: String) extends VerificationMethod { def did = DIDSubject(value.split('#').head) - def fromto = FROMTO(value.split('#').head) // FIXME def id = value // TODO rename value to id def fragment = value.split("#", 2).drop(1).head // TODO make it type safe } diff --git a/did/shared/src/main/scala/fmgp/did/comm/Message.scala b/did/shared/src/main/scala/fmgp/did/comm/Message.scala index d7bba848..93bf4008 100644 --- a/did/shared/src/main/scala/fmgp/did/comm/Message.scala +++ b/did/shared/src/main/scala/fmgp/did/comm/Message.scala @@ -261,6 +261,7 @@ case class Recipient( encrypted_key: Base64, header: RecipientHeader, ) { + // TODO MAYBE return type TO instade of DIDSubject def recipientSubject: DIDSubject = header.didSubject def recipientKid: VerificationMethodReferenced = header.kid } diff --git a/did/shared/src/main/scala/fmgp/did/comm/Operations.scala b/did/shared/src/main/scala/fmgp/did/comm/Operations.scala index e8694074..1c93d5da 100644 --- a/did/shared/src/main/scala/fmgp/did/comm/Operations.scala +++ b/did/shared/src/main/scala/fmgp/did/comm/Operations.scala @@ -42,17 +42,17 @@ trait Operations { def authDecrypt(msg: EncryptedMessage): ZIO[Agent & Resolver, DidFail, Message] = authDecryptRaw(msg).flatMap(Operations.parseMessage(_)) - def verify2PlaintextMessage( - msg: SignedMessage - ): ZIO[Operations & Resolver, CryptoFailed, PlaintextMessage] = for { - payload <- verify(msg).flatMap { - case false => ZIO.fail(SignatureVerificationFailed) - case true => ZIO.succeed(msg.payload) - } - plaintextMessage <- payload.content.fromJson[PlaintextMessage] match - case Left(error) => ZIO.fail(CryptoFailToParse(error)) - case Right(value) => ZIO.succeed(value) - } yield plaintextMessage + def verify2PlaintextMessage(msg: SignedMessage): ZIO[Resolver, CryptoFailed, PlaintextMessage] = + for { + payload <- verify(msg).flatMap { + case false => ZIO.fail(SignatureVerificationFailed) + case true => ZIO.succeed(msg.payload) + } + plaintextMessage <- payload.content.fromJson[PlaintextMessage] match + case Left(error) => ZIO.fail(CryptoFailToParse(error)) + case Right(value) => ZIO.succeed(value) + } yield plaintextMessage + } object Operations { diff --git a/did/shared/src/main/scala/fmgp/did/comm/OperationsImp.scala b/did/shared/src/main/scala/fmgp/did/comm/OperationsImp.scala index 4ab5fd5b..5d2101b1 100644 --- a/did/shared/src/main/scala/fmgp/did/comm/OperationsImp.scala +++ b/did/shared/src/main/scala/fmgp/did/comm/OperationsImp.scala @@ -133,7 +133,7 @@ class OperationsImp(cryptoOperations: CryptoOperations) extends Operations { skid = msg.`protected`.obj match case AnonProtectedHeader(epk, apv, typ, enc, alg) => ??? // FIXME case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => skid - doc <- resolver.didDocument(skid.fromto) + doc <- resolver.didDocument(skid.did.asFROMTO) senderKey = doc.keyAgreementAll.find { e => e.vmr == skid }.get // FIXME get data <- cryptoOperations.authDecrypt(senderKey.key, keys, msg) } yield data diff --git a/vite.config.js b/vite.config.js index 3ff5f86c..e7da324c 100644 --- a/vite.config.js +++ b/vite.config.js @@ -25,7 +25,7 @@ export default defineConfig(({ command, mode, ssrBuild }) => { '/ops': 'http://localhost:8080', '/makeKey/X25519': 'http://localhost:8080', '/makeKey/Ed25519': 'http://localhost:8080', - '^/tap/.*': { + '^/ws': { target: 'ws://localhost:8080', ws: true, }, diff --git a/webapp/src/main/scala/fmgp/Utils.scala b/webapp/src/main/scala/fmgp/Utils.scala index 44253faa..0f989c67 100644 --- a/webapp/src/main/scala/fmgp/Utils.scala +++ b/webapp/src/main/scala/fmgp/Utils.scala @@ -59,4 +59,30 @@ object Utils { case Right(value: EncryptedMessage) => ZIO.fail(FailDecryptSignThenEncrypted(sMsg, value)) } } yield response + + def decryptProgram(eMsg: EncryptedMessage): ZIO[Agent & Resolver, DidFail, (EncryptedMessage, PlaintextMessage)] = + OperationsClientRPC + .decryptRaw(eMsg) + .flatMap { data => Operations.parseMessage(data).map((eMsg, _)) } + .flatMap(msg => + msg match + case (eMsg: EncryptedMessage, plaintext: PlaintextMessage) => + ZIO.succeed((eMsg, plaintext)) + case (eMsg: EncryptedMessage, sMsg: SignedMessage) => + verifyProgram(sMsg: SignedMessage).map(e => (eMsg, e._2)) + // sMsg.payload.content.fromJson[Message] match + // case Left(value) => ZIO.fail(FailToParse(value)) + // case Right(plaintext: PlaintextMessage) => // TODO validate + // ZIO.succeed((eMsg, plaintext)) + // case Right(value: SignedMessage) => ZIO.fail(FailDecryptDoubleSign(sMsg, value)) + // case Right(value: EncryptedMessage) => ZIO.fail(FailDecryptSignThenEncrypted(sMsg, value)) + case (outsideMsg: EncryptedMessage, insideMsg: EncryptedMessage) => + ZIO.fail(FailDecryptDoubleEncrypted(outsideMsg, insideMsg)) + ) + + def verifyProgram(sMsg: SignedMessage): ZIO[Resolver, CryptoFailed, (SignedMessage, PlaintextMessage)] = + OperationsClientRPC + .verify2PlaintextMessage(sMsg) + .map { pMsg => (sMsg, pMsg) } + } diff --git a/webapp/src/main/scala/fmgp/Websocket.scala b/webapp/src/main/scala/fmgp/Websocket.scala deleted file mode 100644 index 1eaaf27f..00000000 --- a/webapp/src/main/scala/fmgp/Websocket.scala +++ /dev/null @@ -1,93 +0,0 @@ -package fmgp - -import org.scalajs.dom.{CloseEvent, Event, MessageEvent, WebSocket} - -import scala.scalajs.js -import zio._ - -//case class World(data: String) - -object Websocket { - type State = State.Value - object State extends Enumeration { - - /** Socket has been created. The connection is not yet open. */ - val CONNECTING = Value(0) - - /** The connection is open and ready to communicate. */ - val OPEN = Value(1) - - /** The connection is in the process of closing. */ - val CLOSING = Value(2) - - /** The connection is closed or couldn't be opened. */ - val CLOSED = Value(3) - } - - case class AutoReconnect[WS <: WebsocketJS]( - wsUrl: String, - wsJS: WS, - defualtReconnectDelay: Int = 20000, - var ws: js.UndefOr[WebSocket] = js.undefined, - ) { - println(wsUrl) - connect(0) - - val wsLayer: ULayer[WebsocketJS] = ZLayer.succeed[WebsocketJS](wsJS) - - def getState: State = ws.map(e => State(e.readyState)).getOrElse(State.CLOSED) - - /** @see https://japgolly.github.io/scalajs-react/#examples/websockets */ - private def connect(delay: Int): Unit = { - // log.info(s"WS try reconect to $wsUrl (in ${delay / 1000} s)") - js.timers.setTimeout(delay) { - Unsafe.unsafe { implicit unsafe => // Run side efect - Runtime.default.unsafe.runToFuture(WebsocketJS.onStateChange(getState).provide(wsLayer)) - // .getOrThrowFiberFailure() - } - - val tmpWS = new WebSocket(wsUrl) // TODO Add a timeout here - ws = tmpWS - - tmpWS.onopen = { (ev: Event) => - Unsafe.unsafe { implicit unsafe => // Run side efect - Runtime.default.unsafe.runToFuture({ - WebsocketJS.onStateChange(Websocket.State.OPEN) *> - WebsocketJS.onOpen(ev.`type`) - }.provide(wsLayer)) - // .getOrThrowFiberFailure() - } - } - tmpWS.onclose = { (ev: CloseEvent) => - Unsafe.unsafe { implicit unsafe => // Run side efect - Runtime.default.unsafe.runToFuture( - { - WebsocketJS.onStateChange(Websocket.State.CLOSED) *> - WebsocketJS.onClose(ev.reason) - }.provide(wsLayer) - .map(_ => connect(defualtReconnectDelay)) - ) - // .getOrThrowFiberFailure().map(_ => connect(defualtReconnectDelay)) - } - } - tmpWS.onmessage = { (ev: MessageEvent) => - Unsafe.unsafe { implicit unsafe => // Run side efect - Runtime.default.unsafe.runToFuture(WebsocketJS.onMessage(message = ev.data.toString).provide(wsLayer)) - // .getOrThrowFiberFailure() - } - } - tmpWS.onerror = { (ev: Event) => - val message = ev - .asInstanceOf[js.Dynamic] - .message - .asInstanceOf[js.UndefOr[String]] - .fold("")("Error: " + _) - Unsafe.unsafe { implicit unsafe => // Run side efect - Runtime.default.unsafe.runToFuture(WebsocketJS.onError(ev.`type`, message).provide(wsLayer)) - // .getOrThrowFiberFailure() - } - } - } - } - } -} diff --git a/webapp/src/main/scala/fmgp/WebsocketJS.scala b/webapp/src/main/scala/fmgp/WebsocketJS.scala deleted file mode 100644 index d63f96fd..00000000 --- a/webapp/src/main/scala/fmgp/WebsocketJS.scala +++ /dev/null @@ -1,54 +0,0 @@ -package fmgp - -import zio._ -import zio.json._ -import fmgp.did.comm._ -import com.raquo.airstream.state.Var - -trait WebsocketJS { - def onOpen(evType: String): UIO[Unit] = Console.printLine(s"WS Connected '$evType'").orDie - def onClose(reason: String): UIO[Unit] = Console.printLine(s"WS Closed because '${reason}'").orDie - def onMessage(message: String): UIO[Unit] - def onError(evType: String, errorMessage: String): UIO[Unit] = - Console.printLine(s"WS Error (type:$evType) occurred! " + errorMessage).orDie - - // Extra - def onStateChange(state: Websocket.State): UIO[Unit] -} - -object WebsocketJS { - // Accessor Methods Inside the Companion Object - def onOpen(evType: String): URIO[WebsocketJS, Unit] = ZIO.serviceWithZIO(_.onOpen(evType)) - def onClose(reason: String): URIO[WebsocketJS, Unit] = ZIO.serviceWithZIO(_.onClose(reason)) - def onMessage(message: String): URIO[WebsocketJS, Unit] = ZIO.serviceWithZIO(_.onMessage(message)) - def onError(evType: String, message: String): URIO[WebsocketJS, Unit] = - ZIO.serviceWithZIO(_.onError(evType: String, message: String)) - def onStateChange(newState: Websocket.State): URIO[WebsocketJS, Unit] = ZIO.serviceWithZIO(_.onStateChange(newState)) -} - -case class WebsocketJSLive( - income: Var[Seq[TapMessage]], - state: Var[Option[Websocket.State]], -) extends WebsocketJS { - override def onMessage(message: String): UIO[Unit] = - message.fromJson[TapMessage] match - case Left(ex) => Console.printLine(message).orDie *> Console.printLine(s"Error parsing the obj World: $ex").orDie - case Right(value) => ZIO.succeed(income.update(s => s :+ value)) - - override def onStateChange(newState: Websocket.State): UIO[Unit] = ZIO.succeed(state.update(_ => Some(newState))) -} - -object WebsocketJSLive { - - import scalajs.js.internal.UnitOps.unitOrOps // This shound not be needed - - val wsUrl = - org.scalajs.dom.window.location.origin - .getOrElse("http://localhost:8080") - .replaceFirst("http", "ws") + "/tap/alice.did.fmgp.app" - - val messages = Var[Seq[TapMessage]](initial = Seq.empty) - val state = Var[Option[Websocket.State]](initial = None) - - lazy val autoReconnect = Websocket.AutoReconnect(wsUrl, WebsocketJSLive(messages, state)) -} diff --git a/webapp/src/main/scala/fmgp/webapp/DecryptTool.scala b/webapp/src/main/scala/fmgp/webapp/DecryptTool.scala index 41f241d6..5e1ff8ac 100644 --- a/webapp/src/main/scala/fmgp/webapp/DecryptTool.scala +++ b/webapp/src/main/scala/fmgp/webapp/DecryptTool.scala @@ -14,7 +14,6 @@ import fmgp.did._ import fmgp.did.comm._ import fmgp.did.comm.protocol.basicmessage2.BasicMessage import fmgp.did.method.peer.DIDPeer._ -import fmgp.did.method.peer.DidPeerResolver import fmgp.crypto.error._ import fmgp.did.AgentProvider @@ -63,7 +62,7 @@ object DecryptTool { case (outsideMsg: EncryptedMessage, insideMsg: EncryptedMessage) => ZIO.fail(FailDecryptDoubleEncrypted(outsideMsg, insideMsg)) ) - Utils.runProgram(program.provideEnvironment(ZEnvironment(agent, DidPeerResolver()))) + Utils.runProgram(program.provideSomeLayer(Global.resolverLayer).provideEnvironment(ZEnvironment(agent))) } .observe(owner) diff --git a/webapp/src/main/scala/fmgp/webapp/EncryptTool.scala b/webapp/src/main/scala/fmgp/webapp/EncryptTool.scala index 8b110902..d2aed97e 100644 --- a/webapp/src/main/scala/fmgp/webapp/EncryptTool.scala +++ b/webapp/src/main/scala/fmgp/webapp/EncryptTool.scala @@ -477,9 +477,13 @@ object EncryptTool { .map { case output: String => output.fromJson[EncryptedMessage] match { case Left(value) => outputFromCallVar.set(None) // side efect - case Right(value: EncryptedMessage) => outputFromCallVar.set(Some(value)) // side efect + case Right(value: EncryptedMessage) => outputFromCallVar.set(Some(value)) } } + .tapError { e => + println("ERROR:" + e.toString) // REMOVE + ZIO.logError(e.toString) *> ZIO.succeed(outputFromCallVar.set(None)) // side efect + } .provide(Global.resolverLayer) ) case _ => // None diff --git a/webapp/src/main/scala/fmgp/webapp/Global.scala b/webapp/src/main/scala/fmgp/webapp/Global.scala index 8d811bee..b95374da 100644 --- a/webapp/src/main/scala/fmgp/webapp/Global.scala +++ b/webapp/src/main/scala/fmgp/webapp/Global.scala @@ -5,15 +5,17 @@ import scala.scalajs.js.annotation.JSExport import org.scalajs.dom import com.raquo.laminar.api.L._ import zio._ +import zio.json._ import fmgp.did._ import fmgp.did.comm._ import fmgp.did.method.peer.DIDPeer import fmgp.did.agent.MessageStorage -import fmgp.crypto.error.DidFail +import fmgp.crypto.error._ import scala.util.Try import scala.util.Failure import scala.util.Success +import fmgp.Utils object Global { @@ -113,4 +115,28 @@ object Global { Failure(exception) } + def tryDecryptVerifyReciveMessagePrograme(msg: SignedMessage | EncryptedMessage): ZIO[Resolver, DidFail, Unit] = + for { + _ <- ZIO.log("") + tmpAgentProvider = agentProvider.now() + jobs = msg match + case sMsg: SignedMessage => + Seq( + Utils + .verifyProgram(sMsg) + .map(e => Global.messageRecive(e._1, e._2)) // side effect! + ) + case eMsg: EncryptedMessage => + eMsg.recipientsSubject.toSeq.map { did => + tmpAgentProvider.getAgentByDID(did) match + case None => ZIO.unit + case Some(agent) => + Utils + .decryptProgram(eMsg) + .map(e => Global.messageRecive(e._1, e._2)) // side effect! + .provideSomeEnvironment((e: ZEnvironment[Resolver]) => e ++ ZEnvironment(agent)) + } + job <- ZIO.foreachDiscard(jobs)(e => e) + } yield () // Utils.runProgram(program.provideSomeLayer(Global.resolverLayer)) + } diff --git a/webapp/src/main/scala/fmgp/webapp/Home.scala b/webapp/src/main/scala/fmgp/webapp/Home.scala index 35d27123..3ee48514 100644 --- a/webapp/src/main/scala/fmgp/webapp/Home.scala +++ b/webapp/src/main/scala/fmgp/webapp/Home.scala @@ -30,12 +30,7 @@ object Home { p("Navigate to ", b("Decrypt Tool "), MyRouter.navigateTo(MyRouter.DecryptPage)), p("Navigate to ", b("Basic Message "), MyRouter.navigateTo(MyRouter.BasicMessagePage)), p("Navigate to ", b("Trust Ping "), MyRouter.navigateTo(MyRouter.TrustPingPage)), - p( - "Navigate to ", - b("TapIntoStream Tool "), - MyRouter.navigateTo(MyRouter.TapIntoStreamPage), - " (tap into all Alice's income messages)" - ), + p("Navigate to ", b("TapIntoStream Tool "), MyRouter.navigateTo(MyRouter.TapIntoStreamPage)), // p( // "Navigate to ", // b("Message DB"), diff --git a/webapp/src/main/scala/fmgp/webapp/TapIntoStreamTool.scala b/webapp/src/main/scala/fmgp/webapp/TapIntoStreamTool.scala index c58a3f18..7a073a21 100644 --- a/webapp/src/main/scala/fmgp/webapp/TapIntoStreamTool.scala +++ b/webapp/src/main/scala/fmgp/webapp/TapIntoStreamTool.scala @@ -1,34 +1,69 @@ package fmgp.webapp -import org.scalajs.dom import scala.scalajs.js import scala.scalajs.js.timers._ -import js.JSConverters._ - +import scala.scalajs.js.JSConverters._ +import org.scalajs.dom import com.raquo.laminar.api.L._ + import zio._ import zio.json._ +import zio.stream.ZStream +import fmgp._ import fmgp.did._ import fmgp.did.comm._ import fmgp.did.comm.protocol.basicmessage2.BasicMessage -import fmgp.did.method.peer.DIDPeer._ -import fmgp.did.method.peer.DidPeerResolver -import fmgp.did.method.peer.DIDPeer -import fmgp.Websocket -import fmgp.WebsocketJSLive +import fmgp.util.TransportWSImp +import typings.std.stdStrings.tr object TapIntoStreamTool { - val ws = WebsocketJSLive.autoReconnect + val messages = Var[Seq[String]](Seq.empty) + + val namesAndDIDs = Seq( // FIXME + ("alice", fmgp.did.AgentProvider.alice.id.asDIDSubject), + ("bob", fmgp.did.AgentProvider.bob.id.asDIDSubject), + ("charlie", fmgp.did.AgentProvider.charlie.id.asDIDSubject), + ("local", fmgp.did.AgentProvider.local.id.asDIDSubject), + ("exampleAlice", fmgp.did.AgentProvider.exampleAlice.id.asDIDSubject), + ("exampleBob", fmgp.did.AgentProvider.exampleBob.id.asDIDSubject), + ("localhost8080Alice", fmgp.did.AgentProvider.localhost8080Alice.id.asDIDSubject), + ("localhost9000Alice", fmgp.did.AgentProvider.localhost9000Alice.id.asDIDSubject), + ) + + // val ws = WebsocketJSLive.autoReconnect + def transmissionProgram( + subjects: Seq[DIDSubject] = namesAndDIDs.map(_._2) + ) = for { + _ <- ZIO.log("TransportWS") + _ = messages.set(Seq.empty) // side effect! + transport <- ZIO.service[TransportWSImp[String]] + _ <- transport.send("info") + _ <- ZIO.foreach(subjects)(s => transport.send(s"link ${s.did}")) + fiber <- transport.inbound + .tap(out => ZIO.succeed(messages.update(out +: _))) // side effect! + .mapZIO(e => + e.fromJson[Message] match + case Left(value) => ??? + case Right(eMsg: EncryptedMessage) => Global.tryDecryptVerifyReciveMessagePrograme(eMsg) + case Right(sMsg: SignedMessage) => Global.tryDecryptVerifyReciveMessagePrograme(sMsg) + case Right(pMsg: PlaintextMessage) => ??? + ) + .runCount + .fork + _ <- ZIO.logDebug(s"TransportWS JOB started for ${subjects.map(_.did)}") + _ <- fiber.join.flatMap(l => ZIO.log(s"TransportWS END after $l events")) + } yield () val rootElement = div( onMountCallback { ctx => // job(ctx.owner) + Utils.runProgram(transmissionProgram().provide(TransportWSImp.layer ++ Scope.default ++ Global.resolverLayer)) () }, code("TapIntoStream Tool"), - p("Tap into Alice's stream"), + p(s"Tap into [${namesAndDIDs.map(_._1).mkString(", ")}] DIDs stream: "), br(), div( // container padding := "12pt", @@ -37,11 +72,21 @@ object TapIntoStreamTool { // Some(FROM("did:peer:123456789qwertyuiopasdfghjklzxcvbnm")), // "TODO WIP", // ), - children <-- ws.wsJS.income.signal.map(_.map(_.decrypted).map { msg => - BasicMessage.fromPlaintextMessage(msg) match - case Left(ex) => incomeMgs(msg.id, msg.from, msg.`type`.value) - case Right(bm) => incomeMgs(msg.id, msg.from, bm.content) - }) + children <-- messages.signal // split(_.blockId) + .map(_.map { data => + data.fromJson[EncryptedMessage] match + case Left(value) => p(s"Not EncryptedMessage (value): $data") + case Right(eMsg) => + val mFrom = eMsg.`protected`.obj.match + case AnonProtectedHeader(epk, apv, typ, enc, alg) => None + case AuthProtectedHeader(epk, apv, skid, apu, typ, enc, alg) => Some(skid.did.asFROM) + incomeMgs(MsgID(eMsg.sha256), mFrom, eMsg.toJsonPretty) + }) + // _.map(_.decrypted).map { msg => + // BasicMessage.fromPlaintextMessage(msg) match + // case Left(ex) => incomeMgs(msg.id, msg.from, msg.`type`.value) + // case Right(bm) => incomeMgs(msg.id, msg.from, bm.content) + ), ) @@ -60,7 +105,7 @@ object TapIntoStreamTool { div( div(msgId.value), div(did.map(_.value).getOrElse("")), - div(content), + div(pre(code(content))), ) ) ) @@ -78,7 +123,7 @@ object TapIntoStreamTool { div( div(msgId.value), div(did.map(_.value).getOrElse("")), - div(content), + div(pre(code(content))), // div( // className := "mdc-card__primary-action", // tabIndex := 0,