diff --git a/modules/core/src/main/scala/muffin/api/ApiClient.scala b/modules/core/src/main/scala/muffin/api/ApiClient.scala index 8f7ef01..244c78a 100644 --- a/modules/core/src/main/scala/muffin/api/ApiClient.scala +++ b/modules/core/src/main/scala/muffin/api/ApiClient.scala @@ -8,6 +8,7 @@ import fs2.* import muffin.codec.* import muffin.http.* +import muffin.http.Method.Get import muffin.model.* trait ApiClient[F[_], To[_], From[_]] { @@ -173,6 +174,8 @@ trait ApiClient[F[_], To[_], From[_]] { def updateRole(id: String, permissions: List[String]): F[RoleInfo] def getRoles(names: List[String]): F[List[RoleInfo]] + + def websocket(): F[WebsocketBuilder[F, To, From]] } object ApiClient { @@ -800,7 +803,23 @@ object ApiClient { Body.Json(names), Map("Authorization" -> s"Bearer ${cfg.auth}") ) + // Roles + // WebSocket + /* + Every call is a new websocket connection + */ + def websocket(): F[WebsocketBuilder[F, To, From]] = + Websocket + .ConnectionBuilder + .build( + http, + Map("Authorization" -> s"Bearer ${cfg.auth}"), + codec, + cfg.baseUrl, + cfg.websocketConnection.retryPolicy.backoffSettings + ) + // WebSocket } diff --git a/modules/core/src/main/scala/muffin/api/ClientConfig.scala b/modules/core/src/main/scala/muffin/api/ClientConfig.scala index 9413aa4..42c2b9b 100644 --- a/modules/core/src/main/scala/muffin/api/ClientConfig.scala +++ b/modules/core/src/main/scala/muffin/api/ClientConfig.scala @@ -1,3 +1,26 @@ package muffin.api -case class ClientConfig(baseUrl: String, auth: String, botName: String, serviceUrl: String, perPage: Int = 60) +import scala.concurrent.duration.FiniteDuration + +case class ClientConfig( + baseUrl: String, + auth: String, + botName: String, + serviceUrl: String, + websocketConnection: WebsocketConnectionConfig, + perPage: Int = 60 +) + +case class WebsocketConnectionConfig( + retryPolicy: RetryPolicy +) + +case class RetryPolicy( + backoffSettings: BackoffSettings +) + +case class BackoffSettings( + initialDelay: FiniteDuration, + maxDelayThreshold: FiniteDuration, + multiply: Int = 2 +) diff --git a/modules/core/src/main/scala/muffin/api/Websocket.scala b/modules/core/src/main/scala/muffin/api/Websocket.scala new file mode 100644 index 0000000..ca2e1b3 --- /dev/null +++ b/modules/core/src/main/scala/muffin/api/Websocket.scala @@ -0,0 +1,94 @@ +package muffin.api + +import java.net.URI +import java.net.URISyntaxException + +import cats.MonadThrow +import cats.syntax.all.given + +import muffin.codec.* +import muffin.http.* +import muffin.model.* +import muffin.model.websocket.domain.* + +trait WebsocketBuilder[F[_], To[_], From[_]] { + + def addListener[EventData: From]( + eventType: EventType, + onEvent: EventData => F[Unit] + ): Websocket.ConnectionBuilder[F, To, From] + + def connect(): F[Unit] +} + +object Websocket { + + class ConnectionBuilder[F[_]: MonadThrow, To[_], From[_]] private ( + httpClient: HttpClient[F, To, From], + headers: Map[String, String], + codecSupport: CodecSupport[To, From], + uri: URI, + backoffSettings: BackoffSettings, + listeners: List[EventListener[F]] = Nil + ) extends WebsocketBuilder[F, To, From] { + import codecSupport.given + + def connect(): F[Unit] = httpClient.websocketWithListeners(uri, headers, backoffSettings, listeners) + + def addListener[EventData: From]( + eventType: EventType, + onEventListener: EventData => F[Unit] + ): ConnectionBuilder[F, To, From] = + new ConnectionBuilder[F, To, From]( + httpClient, + headers, + codecSupport, + uri, + backoffSettings, + new EventListener[F] { + + def onEvent(event: Event[RawJson]): F[Unit] = + if (eventType != event.eventType) { MonadThrow[F].unit } + else { + Decode[EventData].apply(event.data.value).liftTo[F] >>= onEventListener + } + + } :: listeners + ) + + } + + object ConnectionBuilder { + + def build[F[_]: MonadThrow, To[_], From[_]]( + httpClient: HttpClient[F, To, From], + headers: Map[String, String], + codecSupport: CodecSupport[To, From], + baseUrl: String, + backoffSettings: BackoffSettings + ): F[WebsocketBuilder[F, To, From]] = + prepareWebsocketUri(baseUrl) + .map( + new ConnectionBuilder[F, To, From]( + httpClient, + headers, + codecSupport, + _, + backoffSettings + ) + ) + .widen[WebsocketBuilder[F, To, From]] + + private def prepareWebsocketUri[F[_]: MonadThrow](raw: String): F[URI] = { + val init = URI(raw) + (init.getScheme match { + case "http" => (host: String) => URI(s"ws://$host/api/v4/websocket").pure[F] + case "https" => (host: String) => URI(s"wss://$host/api/v4/websocket").pure[F] + case unkownProtocol => + (_: String) => (new URISyntaxException(raw, s"unknown schema: $unkownProtocol")).raiseError[F, URI] + })(init.getAuthority) + } + + } + +} diff --git a/modules/core/src/main/scala/muffin/codec/CodecSupport.scala b/modules/core/src/main/scala/muffin/codec/CodecSupport.scala index f85d3fe..1c2ec0a 100644 --- a/modules/core/src/main/scala/muffin/codec/CodecSupport.scala +++ b/modules/core/src/main/scala/muffin/codec/CodecSupport.scala @@ -9,6 +9,7 @@ import muffin.error.MuffinError import muffin.http.* import muffin.internal.* import muffin.model.* +import muffin.model.websocket.domain.* trait Encode[A] { def apply(obj: A): String @@ -612,9 +613,19 @@ trait CodecSupportLow[To[_], From[_]] extends PrimitivesSupport[To, From] { case id *: message *: props *: EmptyTuple => Post(id, message, props.getOrElse(Props.empty)) } + given EventFrom[A: From]: From[Event[A]] = + parsing + .field[A]("data") + .field[String]("event") + .build { + case eventType *: data *: EmptyTuple => Event(EventType.fromSnakeCase(eventType), data) + } + } trait PrimitivesSupport[To[_], From[_]] extends NewTypeSupport[To, From] { + given RawJsonFrom: From[RawJson] + given StringTo: To[String] given StringFrom: From[String] diff --git a/modules/core/src/main/scala/muffin/error/MuffinError.scala b/modules/core/src/main/scala/muffin/error/MuffinError.scala index fff0e50..fd2f747 100644 --- a/modules/core/src/main/scala/muffin/error/MuffinError.scala +++ b/modules/core/src/main/scala/muffin/error/MuffinError.scala @@ -1,10 +1,33 @@ package muffin.error -sealed abstract class MuffinError(message: String) extends Throwable(message) +import cats.Show +import cats.data.NonEmptyList +import cats.syntax.option.given + +import muffin.model.websocket.domain.EventType + +sealed abstract class MuffinError(message: String, cause: Option[Throwable] = None) + extends Throwable(message, cause.orNull) object MuffinError { case class Decoding(message: String) extends MuffinError(message) case class Http(message: String) extends MuffinError(message) + + object Websockets { + case class Websocket(message: String) extends MuffinError(message) + + case class ListenerError(message: String, eventType: EventType, cause: Throwable) + extends MuffinError(message, cause.some) + + object ListenerError { + given Show[ListenerError] = Show.show[ListenerError](_.toString) // todo: where to place + } + + case class FailedWebsocketProcessing(errors: NonEmptyList[ListenerError]) + extends MuffinError(errors.show[ListenerError]) + + } + } diff --git a/modules/core/src/main/scala/muffin/http/HttpClient.scala b/modules/core/src/main/scala/muffin/http/HttpClient.scala index 11e3fbd..1c141f3 100644 --- a/modules/core/src/main/scala/muffin/http/HttpClient.scala +++ b/modules/core/src/main/scala/muffin/http/HttpClient.scala @@ -1,9 +1,14 @@ package muffin.http +import java.net.URI + import cats.Show import cats.syntax.all.given -trait HttpClient[F[_], To[_], From[_]] { +import muffin.api.BackoffSettings +import muffin.model.websocket.domain.* + +trait HttpClient[F[_], -To[_], From[_]] { def request[In: To, Out: From]( url: String, @@ -13,6 +18,17 @@ trait HttpClient[F[_], To[_], From[_]] { params: Params => Params = identity ): F[Out] + def websocketWithListeners( + uri: URI, + headers: Map[String, String] = Map.empty, + backoffSettings: BackoffSettings, + listeners: List[EventListener[F]] + ): F[Unit] + +} + +trait EventListener[F[_]] { + def onEvent(event: Event[RawJson]): F[Unit] } sealed trait Body[+T] diff --git a/modules/core/src/main/scala/muffin/model/Posts.scala b/modules/core/src/main/scala/muffin/model/Posts.scala index 825feb3..125a02d 100644 --- a/modules/core/src/main/scala/muffin/model/Posts.scala +++ b/modules/core/src/main/scala/muffin/model/Posts.scala @@ -23,11 +23,8 @@ case class Post( // hashtag: Option[String], // file_ids: List[String], // pending_post_id: Option[String], -// metadata: PostMetadata ) -case class PostMetadata(reactions: Option[ReactionInfo]) - case class Props(attachments: List[Attachment] = Nil) object Props { diff --git a/modules/core/src/main/scala/muffin/model/websocket/domain.scala b/modules/core/src/main/scala/muffin/model/websocket/domain.scala new file mode 100644 index 0000000..352f0ca --- /dev/null +++ b/modules/core/src/main/scala/muffin/model/websocket/domain.scala @@ -0,0 +1,91 @@ +package muffin.model.websocket + +import muffin.model.Post + +object domain { + + opaque type RawJson = String + + object RawJson { + def from(s: String): RawJson = s + + extension (json: RawJson) { + def value: String = json + } + + } + + case class Event[A]( + eventType: EventType, + data: A + ) + + enum EventType { + case Hello + case Posted + case AddedToTeam + case AuthenticationChallenge + case ChannelConverted + case ChannelCreated + case ChannelDeleted + case ChannelMemberUpdated + case ChannelUpdated + case ChannelViewed + case ConfigChanged + case DeleteTeam + case DirectAdded + case EmojiAdded + case EphemeralMessage + case GroupAdded + case LeaveTeam + case LicenseChanged + case MemberroleUpdated + case NewUser + case PluginDisabled + case PluginEnabled + case PluginStatusesChanged + case PostDeleted + case PostEdited + case PostUnread + case PreferenceChanged + case PreferencesChanged + case PreferencesDeleted + case ReactionAdded + case ReactionRemoved + case Response + case RoleUpdated + case StatusChange + case Typing + case UpdateTeam + case UserAdded + case UserRemoved + case UserRoleUpdated + case UserUpdated + case DialogOpened + case ThreadUpdated + case ThreadFollowChanged + case ThreadReadChanged + } + + object EventType { + + def fromSnakeCase(s: String): EventType = { + val tokens = s.split("_").toList.map(_.capitalize) + EventType.valueOf( + tokens.foldLeft(new StringBuilder(tokens.length)) { + (builder, token) => builder.addAll(token) + } + .toString() + ) + } + + } + +} + +case class PostedEventData( + channelName: String, + teamId: String, + senderName: String, + post: Post +) diff --git a/modules/core/src/test/scala/muffin/api/ApiTest.scala b/modules/core/src/test/scala/muffin/api/ApiTest.scala index ce3f2a1..b7d31d6 100644 --- a/modules/core/src/test/scala/muffin/api/ApiTest.scala +++ b/modules/core/src/test/scala/muffin/api/ApiTest.scala @@ -1,17 +1,23 @@ package muffin.api import java.time.ZoneId +import scala.concurrent.Future +import cats.effect.Deferred import cats.effect.IO +import cats.effect.std.Queue import cats.syntax.all.* +import concurrent.duration.given import org.scalatest.{Assertion, Succeeded, Tag} import org.scalatest.featurespec.AsyncFeatureSpec import muffin.codec.* import muffin.dsl.* +import muffin.error.MuffinError import muffin.http.* import muffin.model.* +import muffin.model.websocket.domain.* trait ApiTest[To[_], From[_]](integration: String, codecSupport: CodecSupport[To, From]) extends ApiTestSupport { protected def httpClient: HttpClient[IO, To, From] @@ -21,20 +27,27 @@ trait ApiTest[To[_], From[_]](integration: String, codecSupport: CodecSupport[To case class AppContext(int: Int, str: String) protected def toContext: To[AppContext] + protected def fromContext: From[AppContext] object AppContext { given Encode[AppContext] = EncodeTo(toContext) + given Decode[AppContext] = DecodeFrom(fromContext) } private given ZoneId = ZoneId.of("UTC") private val config = ClientConfig( - baseUrl = "http/test", + baseUrl = "http://http/test", auth = "auth", botName = "testbot", serviceUrl = "service", + WebsocketConnectionConfig( + RetryPolicy( + BackoffSettings(2.seconds, 6.seconds) + ) + ), perPage = 1 ) @@ -263,4 +276,69 @@ trait ApiTest[To[_], From[_]](integration: String, codecSupport: CodecSupport[To } } + Feature("websocket") { + given From[domain.TestObject] = + parsing + .field[String]("field") + .build[domain.TestObject] { + case field *: EmptyTuple => domain.TestObject.apply(field) + } + + Scenario(s"process websocket event $integration") { + for { + listenedEvent <- Deferred[IO, domain.TestObject] + websocketFiber <- apiClient + .websocket() + .flatMap( + _.addListener[domain.TestObject]( + EventType.Hello, + event => listenedEvent.complete(event).void + ) + .connect() + .start + ) + event <- listenedEvent.get.timeout(2.seconds) + _ <- websocketFiber.join + } yield assert(event == domain.TestObject.default) + } + + Scenario(s"Different connections work independent $integration") { + val badEvent = domain.TestObject("broken") + for { + listenedEvents <- Queue.unbounded[IO, domain.TestObject] + brokenWebsocketFiber <- apiClient + .websocket() + .flatMap( + _.addListener[String]( + EventType.Hello, + event => + listenedEvents.offer(badEvent) + ) + .connect() + .recoverWith { + case _: MuffinError.Decoding => listenedEvents.offer(badEvent) + } + .start + ) + + workingWebsocketFiber <- apiClient + .websocket() + .flatMap( + _.addListener[domain.TestObject]( + EventType.Hello, + event => listenedEvents.offer(event) + ) + .connect() + .start + ) + + events <- listenedEvents.tryTakeN(2.some).timeout(2.second) + _ <- brokenWebsocketFiber.join *> workingWebsocketFiber.join + } yield assert( + events.contains(domain.TestObject.default) && + events.contains(badEvent) + ) + } + } + } diff --git a/modules/core/src/test/scala/muffin/api/ApiTestSupport.scala b/modules/core/src/test/scala/muffin/api/ApiTestSupport.scala index e166213..48ad503 100644 --- a/modules/core/src/test/scala/muffin/api/ApiTestSupport.scala +++ b/modules/core/src/test/scala/muffin/api/ApiTestSupport.scala @@ -60,7 +60,7 @@ trait ApiTestSupport extends AsyncFeatureSpec { protected def testRequest(url: String, method: Method, request: Option[String], params: Params): IO[String] = for { - testObject <- loadResource(s"$url/$method.json") + testObject <- loadResource(s"""${url.replace(raw"http://", "")}/$method.json""") requestBody <- request.traverse[IO, Json](parseJson(_)) expected <- parseJson[List[TestObject]](testObject) diff --git a/modules/core/src/test/scala/muffin/api/domain.scala b/modules/core/src/test/scala/muffin/api/domain.scala new file mode 100644 index 0000000..a4f00d3 --- /dev/null +++ b/modules/core/src/test/scala/muffin/api/domain.scala @@ -0,0 +1,11 @@ +package muffin.api + +object domain { + + case class TestObject(field: String) + + object TestObject { + val default: TestObject = TestObject("default") + } + +} diff --git a/modules/core/src/test/scala/muffin/model/websocket/EntityTypeParsingTest.scala b/modules/core/src/test/scala/muffin/model/websocket/EntityTypeParsingTest.scala new file mode 100644 index 0000000..e2b2b82 --- /dev/null +++ b/modules/core/src/test/scala/muffin/model/websocket/EntityTypeParsingTest.scala @@ -0,0 +1,77 @@ +package muffin.model.websocket + +import scala.concurrent.Future +import scala.util.Try + +import muffin.api.ApiTestSupport +import muffin.model.websocket.domain.* + +class EntityTypeParsingTest() extends ApiTestSupport { + + private val integration = "parsing" + + Feature(s"EntityType parsing $integration") { + Scenario(s"Parse from snake case, correct raw types $integration") { + val rawTypes = List( + "added_to_team", + "authentication_challenge", + "channel_converted", + "channel_created", + "channel_deleted", + "channel_member_updated", + "channel_updated", + "channel_viewed", + "config_changed", + "delete_team", + "direct_added", + "emoji_added", + "ephemeral_message", + "group_added", + "hello", + "leave_team", + "license_changed", + "memberrole_updated", + "new_user", + "plugin_disabled", + "plugin_enabled", + "plugin_statuses_changed", + "post_deleted", + "post_edited", + "post_unread", + "posted", + "preference_changed", + "preferences_changed", + "preferences_deleted", + "reaction_added", + "reaction_removed", + "response", + "role_updated", + "status_change", + "typing", + "update_team", + "user_added", + "user_removed", + "user_role_updated", + "user_updated", + "dialog_opened", + "thread_updated", + "thread_follow_changed", + "thread_read_changed" + ) + + val result: List[EventType] = rawTypes.map(EventType.fromSnakeCase) + + Future.successful(assert(result.length == EventType.values.length)) + } + Scenario(s"Parse from snake case, incorrect kebab case raw types $integration") { + val rawTypes = List( + "added-to-team" + ) + + val result = rawTypes.map(tpe => Try(EventType.fromSnakeCase(tpe))) + + Future.successful(assert(result.head.isFailure)) + } + } + +} diff --git a/modules/examples/http4s-sttp-circe/build.sbt b/modules/examples/http4s-sttp-circe/build.sbt index 53a5e8c..42bc0f2 100644 --- a/modules/examples/http4s-sttp-circe/build.sbt +++ b/modules/examples/http4s-sttp-circe/build.sbt @@ -1,3 +1,2 @@ - -libraryDependencies += "com.softwaremill.sttp.client3" %% "async-http-client-backend-cats" % "3.8.15" -libraryDependencies += "org.http4s" %% "http4s-ember-server" % "1.0.0-M39" +libraryDependencies += "com.softwaremill.sttp.client3" %% "fs2" % "3.9.7" +libraryDependencies += "org.http4s" %% "http4s-ember-server" % "1.0.0-M39" diff --git a/modules/examples/http4s-sttp-circe/src/main/scala/muffin/Application.scala b/modules/examples/http4s-sttp-circe/src/main/scala/muffin/Application.scala index 35c21b6..61915b4 100644 --- a/modules/examples/http4s-sttp-circe/src/main/scala/muffin/Application.scala +++ b/modules/examples/http4s-sttp-circe/src/main/scala/muffin/Application.scala @@ -1,6 +1,7 @@ package muffin import java.time.{LocalDateTime, ZoneId} +import scala.concurrent.duration.given import cats.effect.* import cats.effect.IO.given @@ -9,7 +10,7 @@ import com.comcast.ip4s.* import io.circe.{Decoder, Encoder} import org.http4s.ember.server.* import org.http4s.server.Router -import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend +import sttp.client3.httpclient.fs2.HttpClientFs2Backend import muffin.api.* import muffin.dsl.* @@ -31,23 +32,37 @@ class SimpleCommandHandler(api: Api) { object Application extends IOApp { def run(args: List[String]): IO[ExitCode] = - for { - backend <- AsyncHttpClientCatsBackend[IO]() - client <- SttpClient(backend, codec) - - given ZoneId = ZoneId.systemDefault() - cfg = ClientConfig("url", "token", "botname", "url") - api = new ApiClient.Live(client, cfg)(codec) - - handler = SimpleCommandHandler(api) - timeHandle = handle(handler, "kek").command(_.time) - - router <- timeHandle.in[IO, IO]() - - routes = Http4sRoute.routes(router) - - _ <- EmberServerBuilder.default[IO].withHost(ipv4"0.0.0.0").withPort(port"8080") - .withHttpApp(Router("/" -> routes).orNotFound).build.use(_ => IO.never) - } yield ExitCode.Success + HttpClientFs2Backend.resource[IO]().use(backend => + for { + client <- SttpClient(backend, codec) + + given ZoneId = ZoneId.systemDefault() + cfg = ClientConfig( + "url", + "token", + "botname", + "url", + WebsocketConnectionConfig( + RetryPolicy( + BackoffSettings( + 1.second, + 4.second + ) + ) + ) + ) + api = new ApiClient.Live(client, cfg)(codec) + + handler = SimpleCommandHandler(api) + timeHandle = handle(handler, "kek").command(_.time) + + router <- timeHandle.in[IO, IO]() + + routes = Http4sRoute.routes(router) + + _ <- EmberServerBuilder.default[IO].withHost(ipv4"0.0.0.0").withPort(port"8080") + .withHttpApp(Router("/" -> routes).orNotFound).build.use(_ => IO.never) + } yield ExitCode.Success + ) } diff --git a/modules/examples/zhttp-zjson/src/main/scala/muffin/Application.scala b/modules/examples/zhttp-zjson/src/main/scala/muffin/Application.scala index c260664..1290fad 100644 --- a/modules/examples/zhttp-zjson/src/main/scala/muffin/Application.scala +++ b/modules/examples/zhttp-zjson/src/main/scala/muffin/Application.scala @@ -1,6 +1,7 @@ package muffin import java.time.{LocalDateTime, ZoneId} +import scala.concurrent.duration.given import cats.effect.* @@ -30,7 +31,20 @@ object Application extends ZIOAppDefault { (for { client <- ZioClient[Any, Task, JsonEncoder, JsonDecoder](codec) - cfg = ClientConfig("url", "token", "botname", "url") + cfg = ClientConfig( + "url", + "token", + "botname", + "url", + WebsocketConnectionConfig( + RetryPolicy( + BackoffSettings( + 1.second, + 4.second + ) + ) + ) + ) given ZoneId = ZoneId.systemDefault() @@ -38,11 +52,11 @@ object Application extends ZIOAppDefault { handler = SimpleCommandHandler(api) - router <- handle(handler, "kek").command(_.time).in[RHttp[Client], Task]() + router <- handle(handler, "kek").command(_.time).in[RHttp[Client & Scope], Task]() routes = ZioRoutes.routes(router) _ <- Server.serve(routes) } yield _root_.zio.ExitCode.success) - .provide(Server.default, Client.default) + .provide(Server.default, Client.default, Scope.default) } diff --git a/modules/integration/circe-json-interop/src/main/scala/muffin/interop/json/circe/codec.scala b/modules/integration/circe-json-interop/src/main/scala/muffin/interop/json/circe/codec.scala index 8745fef..35b57ad 100644 --- a/modules/integration/circe-json-interop/src/main/scala/muffin/interop/json/circe/codec.scala +++ b/modules/integration/circe-json-interop/src/main/scala/muffin/interop/json/circe/codec.scala @@ -13,6 +13,7 @@ import io.circe.syntax.given import muffin.codec.* import muffin.error.MuffinError import muffin.http.Body +import muffin.model.websocket.domain.RawJson object codec extends codecLow1 @@ -44,6 +45,8 @@ trait CodecLow2 extends CodecSupport[Encoder, Decoder] { given UnitFrom: Decoder[Unit] = _ => Right(()) + given RawJsonFrom: Decoder[RawJson] = Decoder.decodeJsonObject.map(obj => RawJson.from(obj.asJson.toString)) + given StringTo: Encoder[String] = Encoder.encodeString given StringFrom: Decoder[String] = Decoder.decodeString diff --git a/modules/integration/circe-json-interop/src/test/scala/muffin/interop/json/circe/CirceApiTest.scala b/modules/integration/circe-json-interop/src/test/scala/muffin/interop/json/circe/CirceApiTest.scala index d634c49..a494cea 100644 --- a/modules/integration/circe-json-interop/src/test/scala/muffin/interop/json/circe/CirceApiTest.scala +++ b/modules/integration/circe-json-interop/src/test/scala/muffin/interop/json/circe/CirceApiTest.scala @@ -1,5 +1,7 @@ package muffin.interop.json.circe +import java.net.URI + import cats.effect.IO import cats.syntax.all.* @@ -7,14 +9,19 @@ import io.circe.* import org.scalatest.* import org.scalatest.featurespec.AsyncFeatureSpec -import muffin.api.ApiTest -import muffin.http.{Body, HttpClient, Method, Params} +import muffin.api.* +import muffin.http.{Body, EventListener, HttpClient, Method, Params} +import muffin.model.websocket.domain.{Event, EventType} +import muffin.model.websocket.domain.RawJson class CirceApiTest extends ApiTest[Encoder, Decoder]("circe", codec) { protected def toContext: Encoder[AppContext] = io.circe.Derivation.summonEncoder[AppContext] protected def fromContext: Decoder[AppContext] = io.circe.Derivation.summonDecoder[AppContext] + given Decoder[domain.TestObject] = Decoder.derived + given Encoder[domain.TestObject] = Encoder.AsObject.derived + protected def httpClient: HttpClient[IO, Encoder, Decoder] = new HttpClient[IO, Encoder, Decoder] { @@ -34,6 +41,20 @@ class CirceApiTest extends ApiTest[Encoder, Decoder]("circe", codec) { case Body.Multipart(parts) => ??? }).flatMap(parseJson(_)) + def websocketWithListeners( + uri: URI, + headers: Map[String, String], + backoffSettings: BackoffSettings, + listeners: List[EventListener[IO]] + ): IO[Unit] = events.traverse_(event => listeners.traverse_(_.onEvent(event))) + } + private val events = List( + Event( + EventType.Hello, + RawJson.from(Encoder[domain.TestObject].apply(domain.TestObject.default).toString) + ) + ) + } diff --git a/modules/integration/sttp-http-interop/build.sbt b/modules/integration/sttp-http-interop/build.sbt index f2fedc4..63361ed 100644 --- a/modules/integration/sttp-http-interop/build.sbt +++ b/modules/integration/sttp-http-interop/build.sbt @@ -1,4 +1,4 @@ import Dependencies._ -libraryDependencies ++= sttp :: Nil +libraryDependencies ++= sttp diff --git a/modules/integration/sttp-http-interop/src/main/scala/muffin/interop/http/sttp/SttpClient.scala b/modules/integration/sttp-http-interop/src/main/scala/muffin/interop/http/sttp/SttpClient.scala index 3287594..053fcfa 100644 --- a/modules/integration/sttp-http-interop/src/main/scala/muffin/interop/http/sttp/SttpClient.scala +++ b/modules/integration/sttp-http-interop/src/main/scala/muffin/interop/http/sttp/SttpClient.scala @@ -1,19 +1,31 @@ package muffin.interop.http.sttp -import cats.MonadThrow -import cats.effect.Sync +import java.net.URI +import scala.util.chaining.given + +import cats.{MonadThrow, Parallel} +import cats.data.NonEmptyList +import cats.effect.{Sync, Temporal} import cats.syntax.all.given +import fs2.* +import sttp.capabilities.WebSockets +import sttp.capabilities.fs2.Fs2Streams import sttp.client3.* import sttp.model.{Method as SMethod, Uri} +import sttp.ws.WebSocketFrame +import muffin.api.BackoffSettings import muffin.codec.* import muffin.error.MuffinError import muffin.http.* import muffin.internal.syntax.* +import muffin.model.websocket.domain.* -class SttpClient[F[_]: MonadThrow, To[_], From[_]](backend: SttpBackend[F, Any], codec: CodecSupport[To, From]) - extends HttpClient[F, To, From] { +class SttpClient[F[_]: Temporal: Parallel, To[_], From[_]]( + backend: SttpBackend[F, Fs2Streams[F] & WebSockets], + codec: CodecSupport[To, From] +) extends HttpClient[F, To, From] { import codec.given @@ -70,12 +82,86 @@ class SttpClient[F[_]: MonadThrow, To[_], From[_]](backend: SttpBackend[F, Any], } } + def websocketWithListeners( + uri: URI, + headers: Map[String, String] = Map.empty, + backoffSettings: BackoffSettings, + listeners: List[EventListener[F]] = Nil + ): F[Unit] = { + val websocketEventProcessing: Pipe[F, WebSocketFrame.Data[?], WebSocketFrame] = { input => + input.flatMap { + case WebSocketFrame.Text(payload, _, _) => + Stream.eval( + Decode[Event[RawJson]].apply(payload).liftTo[F] >>= { + event => + listeners + .parTraverse( + _.onEvent(event) + .attempt + .map( + _.leftMap(err => + MuffinError.Websockets.ListenerError(err.getMessage, event.eventType, err) + ) + ) + ) >>= { + _.collect { case Left(err) => err } + .pipe(NonEmptyList.fromList) + .traverse_( + MuffinError.Websockets.FailedWebsocketProcessing(_).raiseError[F, Unit] + ) + } + } + ) *> + Stream.empty + + case _ => Stream.empty + } + } + + val request = basicRequest + .headers(headers) + .response( + asWebSocketStream(Fs2Streams[F])( + websocketEventProcessing + ) + .mapLeft(MuffinError.Websockets.Websocket(_)) + ) + .get(uri"${uri.toString}") + .send(backend) + .map(_.body) + .flatMap { + case Left(error) => MonadThrow[F].raiseError(error) + case Right(value) => value.pure[F] + } + + retryWithBackoff(request, backoffSettings) + + } + + private def retryWithBackoff[A](f: F[A], backoffSettings: BackoffSettings): F[A] = + f + .handleErrorWith { + case _: SttpClientException.ConnectException | + _: SttpClientException.TimeoutException | + _: SttpClientException.ReadException => + Temporal[F].sleep( + backoffSettings.initialDelay min backoffSettings.maxDelayThreshold + ) *> retryWithBackoff( + f, + backoffSettings + .copy(initialDelay = + (backoffSettings.initialDelay * backoffSettings.multiply) min backoffSettings.maxDelayThreshold + ) + ) + } + .flatMap(_ => retryWithBackoff(f, backoffSettings)) + } object SttpClient { - def apply[I[_]: Sync, F[_]: MonadThrow, To[_], From[_]]( - backend: SttpBackend[F, Any], + def apply[I[_]: Sync, F[_]: Temporal: Parallel, To[_], From[_]]( + backend: SttpBackend[F, Fs2Streams[F] & WebSockets], codec: CodecSupport[To, From] ): I[SttpClient[F, To, From]] = Sync[I].delay(new SttpClient[F, To, From](backend, codec)) diff --git a/modules/integration/zio-http-interop/src/main/scala/muffin/interop/http/zio/ZioClient.scala b/modules/integration/zio-http-interop/src/main/scala/muffin/interop/http/zio/ZioClient.scala index a25e82c..60825dc 100644 --- a/modules/integration/zio-http-interop/src/main/scala/muffin/interop/http/zio/ZioClient.scala +++ b/modules/integration/zio-http-interop/src/main/scala/muffin/interop/http/zio/ZioClient.scala @@ -1,16 +1,26 @@ package muffin.interop.http.zio +import java.net.ConnectException +import java.net.URI import java.nio.charset.Charset +import scala.util.chaining.given +import cats.data.NonEmptyList import cats.effect.Sync +import cats.syntax.all.* import zio.* import zio.http.{Body as ZBody, Method as ZMethod, *} +import zio.http.ChannelEvent.Read +import muffin.api.BackoffSettings import muffin.codec.* +import muffin.error.MuffinError import muffin.http.* +import muffin.model.websocket.domain.* -class ZioClient[R, To[_], From[_]](codec: CodecSupport[To, From]) extends HttpClient[RHttp[R with Client], To, From] { +class ZioClient[R, To[_], From[_]](codec: CodecSupport[To, From]) + extends HttpClient[RHttp[R with Client with Scope], To, From] { import codec.given @@ -66,6 +76,68 @@ class ZioClient[R, To[_], From[_]](codec: CodecSupport[To, From]) extends HttpCl } } yield res + def websocketWithListeners( + uri: URI, + headers: Map[String, String], + backoffSettings: BackoffSettings, + listeners: List[EventListener[RHttp[R with Client with Scope]]] + ): RIO[R with Client with Scope, Unit] = { + val retryPolicy = + Schedule.exponential( + Duration.fromScala(backoffSettings.initialDelay), + backoffSettings.multiply + ) || Schedule.fixed( + Duration.fromScala(backoffSettings.maxDelayThreshold) + ) + Handler.webSocket { channel => + channel + .receiveAll { + case Read(WebSocketFrame.Text(payload)) => + Decode[Event[RawJson]].apply(payload) match { + case Left(decoding) => ZIO.fail(decoding) + case Right(event) => + ZIO.foreachPar(listeners) { + _.onEvent(event) + .either + .map( + _.leftMap(err => + MuffinError.Websockets.ListenerError(err.getMessage, event.eventType, err) + ) + ) + } + .flatMap { + res => + ZIO.foreach( + res.collect { case Left(err) => err } + .pipe(NonEmptyList.fromList) + )(errs => + ZIO.fail( + MuffinError.Websockets.FailedWebsocketProcessing(errs) + ) + ) + } + } + + case _ => ZIO.unit + } + } + .connect( + uri.toString, + Headers( + headers.toList.foldLeft(List.empty[Header]) { + case (acc, (k, v)) => Header.Custom(k, v) :: acc + } + ) + ) + .retry( + retryPolicy && Schedule.recurWhile[Throwable] { + case err: ConnectException => true + case _ => false + } + ) + .unit + } + } object ZioClient { diff --git a/modules/integration/zio-json-interop/src/main/scala/muffin/interop/json/zio/codec.scala b/modules/integration/zio-json-interop/src/main/scala/muffin/interop/json/zio/codec.scala index d7b3904..26fabcd 100644 --- a/modules/integration/zio-json-interop/src/main/scala/muffin/interop/json/zio/codec.scala +++ b/modules/integration/zio-json-interop/src/main/scala/muffin/interop/json/zio/codec.scala @@ -10,6 +10,7 @@ import zio.json.internal.* import muffin.codec.* import muffin.error.MuffinError import muffin.http.Body +import muffin.model.websocket.domain.RawJson object codec extends CodecLow { given NothingTo: JsonEncoder[Nothing] = UnitTo.asInstanceOf[JsonEncoder[Nothing]] @@ -27,6 +28,8 @@ trait CodecLow extends CodecSupport[JsonEncoder, JsonDecoder] { given UnitFrom: JsonDecoder[Unit] = (_, _) => () + given RawJsonFrom: JsonDecoder[RawJson] = JsonDecoder[Json].map(json => RawJson.from(json.toJson)) + given StringTo: JsonEncoder[String] = JsonEncoder.string given StringFrom: JsonDecoder[String] = JsonDecoder.string diff --git a/modules/integration/zio-json-interop/src/test/scala/muffin/interop/json/zio/ZioApiTest.scala b/modules/integration/zio-json-interop/src/test/scala/muffin/interop/json/zio/ZioApiTest.scala index 8c83126..f01ea57 100644 --- a/modules/integration/zio-json-interop/src/test/scala/muffin/interop/json/zio/ZioApiTest.scala +++ b/modules/integration/zio-json-interop/src/test/scala/muffin/interop/json/zio/ZioApiTest.scala @@ -1,21 +1,27 @@ package muffin.interop.json.zio +import java.net.URI + import cats.effect.IO import cats.syntax.all.* import org.scalatest.* import org.scalatest.featurespec.AsyncFeatureSpec +import zio.* import zio.json.* -import muffin.api.ApiTest -import muffin.http.{Body, HttpClient, Method, Params} +import muffin.api.{domain, ApiTest, BackoffSettings} +import muffin.http.{Body, EventListener, HttpClient, Method, Params} +import muffin.model.websocket.domain.* class ZioApiTest extends ApiTest[JsonEncoder, JsonDecoder]("zio", codec) { protected def toContext: JsonEncoder[AppContext] = JsonEncoder.derived[AppContext] protected def fromContext: JsonDecoder[AppContext] = JsonDecoder.derived[AppContext] - protected def httpClient: HttpClient[IO, JsonEncoder, JsonDecoder] = + protected def httpClient: HttpClient[IO, JsonEncoder, JsonDecoder] = { + given JsonEncoder[domain.TestObject] = JsonEncoder.derived + new HttpClient[IO, JsonEncoder, JsonDecoder] { def request[In: JsonEncoder, Out: JsonDecoder]( @@ -38,6 +44,21 @@ class ZioApiTest extends ApiTest[JsonEncoder, JsonDecoder]("zio", codec) { } ) + def websocketWithListeners( + uri: URI, + headers: Map[String, String], + backoffSettings: BackoffSettings, + listeners: List[EventListener[IO]] + ): IO[Unit] = events.traverse_(event => listeners.traverse_(_.onEvent(event))) + + private val events = List( + Event( + EventType.Hello, + RawJson.from(domain.TestObject.default.toJson) + ) + ) + } + } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 66b1bb6..5610385 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -25,7 +25,10 @@ object Dependencies { val scalatest = "3.2.15" } - val sttp = "com.softwaremill.sttp.client3" %% "core" % Version.sttp + val sttp = Seq( + "com.softwaremill.sttp.client3" %% "core" % Version.sttp, + "com.softwaremill.sttp.client3" %% "async-http-client-backend-fs2" % Version.sttp + ) object http4s { val core = "org.http4s" %% "http4s-core" % Version.http4s