Skip to content

Commit

Permalink
Add websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Kolmogortsev committed Aug 5, 2024
1 parent 781c86d commit 1f1d5da
Show file tree
Hide file tree
Showing 23 changed files with 724 additions and 47 deletions.
19 changes: 19 additions & 0 deletions modules/core/src/main/scala/muffin/api/ApiClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import fs2.*

import muffin.codec.*
import muffin.http.*
import muffin.http.Method.Get
import muffin.model.*

trait ApiClient[F[_], To[_], From[_]] {
Expand Down Expand Up @@ -173,6 +174,8 @@ trait ApiClient[F[_], To[_], From[_]] {
def updateRole(id: String, permissions: List[String]): F[RoleInfo]

def getRoles(names: List[String]): F[List[RoleInfo]]

def websocket(): F[WebsocketBuilder[F, To, From]]
}

object ApiClient {
Expand Down Expand Up @@ -800,7 +803,23 @@ object ApiClient {
Body.Json(names),
Map("Authorization" -> s"Bearer ${cfg.auth}")
)

// Roles
// WebSocket
/*
Every call is a new websocket connection
*/
def websocket(): F[WebsocketBuilder[F, To, From]] =
Websocket
.ConnectionBuilder
.build(
http,
Map("Authorization" -> s"Bearer ${cfg.auth}"),
codec,
cfg.baseUrl,
cfg.websocketConnection.retryPolicy.backoffSettings
)
// WebSocket

}

Expand Down
25 changes: 24 additions & 1 deletion modules/core/src/main/scala/muffin/api/ClientConfig.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
package muffin.api

case class ClientConfig(baseUrl: String, auth: String, botName: String, serviceUrl: String, perPage: Int = 60)
import scala.concurrent.duration.FiniteDuration

case class ClientConfig(
baseUrl: String,
auth: String,
botName: String,
serviceUrl: String,
websocketConnection: WebsocketConnectionConfig,
perPage: Int = 60
)

case class WebsocketConnectionConfig(
retryPolicy: RetryPolicy
)

case class RetryPolicy(
backoffSettings: BackoffSettings
)

case class BackoffSettings(
initialDelay: FiniteDuration,
maxDelayThreshold: FiniteDuration,
multiply: Int = 2
)
94 changes: 94 additions & 0 deletions modules/core/src/main/scala/muffin/api/Websocket.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package muffin.api

import java.net.URI
import java.net.URISyntaxException

import cats.MonadThrow
import cats.syntax.all.given

import muffin.codec.*
import muffin.http.*
import muffin.model.*
import muffin.model.websocket.domain.*

trait WebsocketBuilder[F[_], To[_], From[_]] {

def addListener[EventData: From](
eventType: EventType,
onEvent: EventData => F[Unit]
): Websocket.ConnectionBuilder[F, To, From]

def connect(): F[Unit]
}

object Websocket {

class ConnectionBuilder[F[_]: MonadThrow, To[_], From[_]] private (
httpClient: HttpClient[F, To, From],
headers: Map[String, String],
codecSupport: CodecSupport[To, From],
uri: URI,
backoffSettings: BackoffSettings,
listeners: List[EventListener[F]] = Nil
) extends WebsocketBuilder[F, To, From] {
import codecSupport.given

def connect(): F[Unit] = httpClient.websocketWithListeners(uri, headers, backoffSettings, listeners)

def addListener[EventData: From](
eventType: EventType,
onEventListener: EventData => F[Unit]
): ConnectionBuilder[F, To, From] =
new ConnectionBuilder[F, To, From](
httpClient,
headers,
codecSupport,
uri,
backoffSettings,
new EventListener[F] {

def onEvent(event: Event[RawJson]): F[Unit] =
if (eventType != event.eventType) { MonadThrow[F].unit }
else {
Decode[EventData].apply(event.data.value).liftTo[F] >>= onEventListener
}

} :: listeners
)

}

object ConnectionBuilder {

def build[F[_]: MonadThrow, To[_], From[_]](
httpClient: HttpClient[F, To, From],
headers: Map[String, String],
codecSupport: CodecSupport[To, From],
baseUrl: String,
backoffSettings: BackoffSettings
): F[WebsocketBuilder[F, To, From]] =
prepareWebsocketUri(baseUrl)
.map(
new ConnectionBuilder[F, To, From](
httpClient,
headers,
codecSupport,
_,
backoffSettings
)
)
.widen[WebsocketBuilder[F, To, From]]

private def prepareWebsocketUri[F[_]: MonadThrow](raw: String): F[URI] = {
val init = URI(raw)
(init.getScheme match {
case "http" => (host: String) => URI(s"ws://$host/api/v4/websocket").pure[F]
case "https" => (host: String) => URI(s"wss://$host/api/v4/websocket").pure[F]
case unkownProtocol =>
(_: String) => (new URISyntaxException(raw, s"unknown schema: $unkownProtocol")).raiseError[F, URI]
})(init.getAuthority)
}

}

}
11 changes: 11 additions & 0 deletions modules/core/src/main/scala/muffin/codec/CodecSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import muffin.error.MuffinError
import muffin.http.*
import muffin.internal.*
import muffin.model.*
import muffin.model.websocket.domain.*

trait Encode[A] {
def apply(obj: A): String
Expand Down Expand Up @@ -612,9 +613,19 @@ trait CodecSupportLow[To[_], From[_]] extends PrimitivesSupport[To, From] {
case id *: message *: props *: EmptyTuple => Post(id, message, props.getOrElse(Props.empty))
}

given EventFrom[A: From]: From[Event[A]] =
parsing
.field[A]("data")
.field[String]("event")
.build {
case eventType *: data *: EmptyTuple => Event(EventType.fromSnakeCase(eventType), data)
}

}

trait PrimitivesSupport[To[_], From[_]] extends NewTypeSupport[To, From] {
given RawJsonFrom: From[RawJson]

given StringTo: To[String]

given StringFrom: From[String]
Expand Down
25 changes: 24 additions & 1 deletion modules/core/src/main/scala/muffin/error/MuffinError.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
package muffin.error

sealed abstract class MuffinError(message: String) extends Throwable(message)
import cats.Show
import cats.data.NonEmptyList
import cats.syntax.option.given

import muffin.model.websocket.domain.EventType

sealed abstract class MuffinError(message: String, cause: Option[Throwable] = None)
extends Throwable(message, cause.orNull)

object MuffinError {

case class Decoding(message: String) extends MuffinError(message)

case class Http(message: String) extends MuffinError(message)

object Websockets {
case class Websocket(message: String) extends MuffinError(message)

case class ListenerError(message: String, eventType: EventType, cause: Throwable)
extends MuffinError(message, cause.some)

object ListenerError {
given Show[ListenerError] = Show.show[ListenerError](_.toString) // todo: where to place
}

case class FailedWebsocketProcessing(errors: NonEmptyList[ListenerError])
extends MuffinError(errors.show[ListenerError])

}

}
18 changes: 17 additions & 1 deletion modules/core/src/main/scala/muffin/http/HttpClient.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package muffin.http

import java.net.URI

import cats.Show
import cats.syntax.all.given

trait HttpClient[F[_], To[_], From[_]] {
import muffin.api.BackoffSettings
import muffin.model.websocket.domain.*

trait HttpClient[F[_], -To[_], From[_]] {

def request[In: To, Out: From](
url: String,
Expand All @@ -13,6 +18,17 @@ trait HttpClient[F[_], To[_], From[_]] {
params: Params => Params = identity
): F[Out]

def websocketWithListeners(
uri: URI,
headers: Map[String, String] = Map.empty,
backoffSettings: BackoffSettings,
listeners: List[EventListener[F]]
): F[Unit]

}

trait EventListener[F[_]] {
def onEvent(event: Event[RawJson]): F[Unit]
}

sealed trait Body[+T]
Expand Down
3 changes: 0 additions & 3 deletions modules/core/src/main/scala/muffin/model/Posts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ case class Post(
// hashtag: Option[String],
// file_ids: List[String],
// pending_post_id: Option[String],
// metadata: PostMetadata
)

case class PostMetadata(reactions: Option[ReactionInfo])

case class Props(attachments: List[Attachment] = Nil)

object Props {
Expand Down
91 changes: 91 additions & 0 deletions modules/core/src/main/scala/muffin/model/websocket/domain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package muffin.model.websocket

import muffin.model.Post

object domain {

opaque type RawJson = String

object RawJson {
def from(s: String): RawJson = s

extension (json: RawJson) {
def value: String = json
}

}

case class Event[A](
eventType: EventType,
data: A
)

enum EventType {
case Hello
case Posted
case AddedToTeam
case AuthenticationChallenge
case ChannelConverted
case ChannelCreated
case ChannelDeleted
case ChannelMemberUpdated
case ChannelUpdated
case ChannelViewed
case ConfigChanged
case DeleteTeam
case DirectAdded
case EmojiAdded
case EphemeralMessage
case GroupAdded
case LeaveTeam
case LicenseChanged
case MemberroleUpdated
case NewUser
case PluginDisabled
case PluginEnabled
case PluginStatusesChanged
case PostDeleted
case PostEdited
case PostUnread
case PreferenceChanged
case PreferencesChanged
case PreferencesDeleted
case ReactionAdded
case ReactionRemoved
case Response
case RoleUpdated
case StatusChange
case Typing
case UpdateTeam
case UserAdded
case UserRemoved
case UserRoleUpdated
case UserUpdated
case DialogOpened
case ThreadUpdated
case ThreadFollowChanged
case ThreadReadChanged
}

object EventType {

def fromSnakeCase(s: String): EventType = {
val tokens = s.split("_").toList.map(_.capitalize)
EventType.valueOf(
tokens.foldLeft(new StringBuilder(tokens.length)) {
(builder, token) => builder.addAll(token)
}
.toString()
)
}

}

}

case class PostedEventData(
channelName: String,
teamId: String,
senderName: String,
post: Post
)
Loading

0 comments on commit 1f1d5da

Please sign in to comment.