Skip to content

Commit

Permalink
WIP Expose relay messages and connection state
Browse files Browse the repository at this point in the history
  • Loading branch information
jmateoac committed Sep 9, 2023
1 parent 17ca6a1 commit a09a960
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 15 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ docker-sources/
/gradle/wrapper/
/.hermit
local.properties
**.DS_Store
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[versions]
acinqSecp256 = "0.10.1"
guava = "32.1.1-jre"
guava = "31.1-jre"
junit = "5.9.3"
kotest = "5.6.2"
# @pin
Expand Down
3 changes: 3 additions & 0 deletions lib/api/lib.api
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public abstract class app/cash/nostrino/client/Relay {
public final fun getDirectMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getNotes ()Lkotlinx/coroutines/flow/Flow;
public final fun getReactions ()Lkotlinx/coroutines/flow/Flow;
public abstract fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getUserMetaData ()Lkotlinx/coroutines/flow/Flow;
public abstract fun send (Lapp/cash/nostrino/model/Event;)V
public abstract fun start ()V
Expand All @@ -34,6 +35,7 @@ public final class app/cash/nostrino/client/RelayClient : app/cash/nostrino/clie
public fun <init> (Ljava/lang/String;Lokhttp3/OkHttpClient;)V
public synthetic fun <init> (Ljava/lang/String;Lokhttp3/OkHttpClient;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow;
public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public fun send (Lapp/cash/nostrino/model/Event;)V
public fun start ()V
public fun stop ()V
Expand Down Expand Up @@ -68,6 +70,7 @@ public final class app/cash/nostrino/client/RelaySet : app/cash/nostrino/client/
public static synthetic fun copy$default (Lapp/cash/nostrino/client/RelaySet;Ljava/util/Set;ILjava/lang/Object;)Lapp/cash/nostrino/client/RelaySet;
public fun equals (Ljava/lang/Object;)Z
public fun getAllEvents ()Lkotlinx/coroutines/flow/Flow;
public fun getRelayMessages ()Lkotlinx/coroutines/flow/Flow;
public final fun getRelays ()Ljava/util/Set;
public fun hashCode ()I
public fun send (Lapp/cash/nostrino/model/Event;)V
Expand Down
15 changes: 10 additions & 5 deletions lib/src/main/kotlin/app/cash/nostrino/client/Relay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package app.cash.nostrino.client

import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.model.EncryptedDm
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
Expand Down Expand Up @@ -52,18 +54,21 @@ abstract class Relay {
/** Unsubscribe from a subscription */
abstract fun unsubscribe(subscription: Subscription)

/** All events transmitted by this relay for our active subscriptions */
/** All messages transmitted by this relay for our active subscriptions */
abstract val relayMessages : Flow<RelayMessage>

/** The subset of [RelayMessage] that only contain messages of type [EventMessage] */
abstract val allEvents: Flow<Event>

/** The subset of allEvents that are of type TextNote */
/** The subset of [allEvents] that are of type [TextNote] */
val notes: Flow<Event> by lazy { allEvents.filter { it.kind == TextNote.kind } }

/** The subset of allEvents that are of type EncryptedDm */
/** The subset of [allEvents] that are of type [EncryptedDm] */
val directMessages: Flow<Event> by lazy { allEvents.filter { it.kind == EncryptedDm.kind } }

/** The subset of allEvents that are of type UserMetaData */
/** The subset of [allEvents] that are of type [UserMetaData] */
val userMetaData: Flow<Event> by lazy { allEvents.filter { it.kind == UserMetaData.kind } }

/** The subset of allEvents that are of type Reaction */
/** The subset of [allEvents] that are of type [Reaction] */
val reactions: Flow<Event> by lazy { allEvents.filter { it.kind == Reaction.kind } }
}
18 changes: 13 additions & 5 deletions lib/src/main/kotlin/app/cash/nostrino/client/RelayClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
Expand All @@ -55,18 +58,21 @@ class RelayClient(
private val jsonListAdapter = moshi.adapter(List::class.java)
private val queuedMessages = MutableSharedFlow<String>(replay = 512)
private val listener = RelayListener(url, this)
private val receivedMessages: Flow<RelayMessage> by lazy { listener.messages() }
private val subscriptions: MutableMap<Subscription, Set<Filter>> = Collections.synchronizedMap(mutableMapOf())

private var connectionState = Disconnected
private var _connectionState = MutableStateFlow(Disconnected)
val connectionState : StateFlow<ConnectionState> get() = _connectionState.asStateFlow() // TODO add tests

private var messageSendingJob: Job? = null
private var socket: WebSocket? = null

override val relayMessages: Flow<RelayMessage> by lazy { listener.messages() }

override fun start() {
if (connectionState == Disconnected) {
if (connectionState.value == Disconnected) {
logger.info { "Connecting to $url" }
socket = client.newWebSocket(Request.Builder().url(url).build(), listener)
connectionState = Connecting
_connectionState.value = Connecting
}
}

Expand All @@ -79,6 +85,8 @@ class RelayClient(
Disconnected -> connect()
Disconnecting -> stopTalking()
}

_connectionState.value = newState
}

override fun stop() {
Expand All @@ -101,7 +109,7 @@ class RelayClient(
send(listOf("CLOSE", subscription.id))
}

override val allEvents: Flow<Event> by lazy { receivedMessages.filterIsInstance<EventMessage>().map { it.event } }
override val allEvents: Flow<Event> by lazy { relayMessages.filterIsInstance<EventMessage>().map { it.event } }

private fun send(message: List<Any>) {
queuedMessages.tryEmit(jsonListAdapter.toJson(message))
Expand Down
20 changes: 16 additions & 4 deletions lib/src/main/kotlin/app/cash/nostrino/client/RelaySet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package app.cash.nostrino.client

import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.map
Expand All @@ -45,16 +48,25 @@ data class RelaySet(
override fun unsubscribe(subscription: Subscription) = relays.forEach { it.unsubscribe(subscription) }

@OptIn(ExperimentalCoroutinesApi::class)
override val allEvents: Flow<Event> by lazy {
override val relayMessages: Flow<RelayMessage> by lazy {
val cache = CacheBuilder.newBuilder()
.maximumSize(4096)
.build<ByteString, Boolean>(CacheLoader.from { _ -> false })
relays.map { it.allEvents }.asFlow()

relays.map { it.relayMessages }.asFlow()
.flattenMerge()
.filterNot { cache.get(it.id) }
.filterNot {
it is EventMessage && cache.get(it.event.id)
}
.map {
cache.put(it.id, true)
if(it is EventMessage) {
cache.put(it.event.id, true)
}
it
}
}

override val allEvents: Flow<Event> by lazy {
relayMessages.filterIsInstance<EventMessage>().map { it.event }
}
}
11 changes: 11 additions & 0 deletions lib/src/test/kotlin/app/cash/nostrino/client/RelaySetTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,22 @@

package app.cash.nostrino.client

import app.cash.nostrino.ArbPrimitive
import app.cash.nostrino.crypto.PubKeyTest.Companion.arbPubKey
import app.cash.nostrino.message.relay.EventMessage
import app.cash.nostrino.message.relay.RelayMessage
import app.cash.nostrino.model.ArbEvent.arbEvent
import app.cash.nostrino.model.ArbEvent.arbSubscriptionId
import app.cash.nostrino.model.Event
import app.cash.nostrino.model.Filter
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.collections.shouldContainExactly
import io.kotest.matchers.shouldBe
import io.kotest.property.Arb
import io.kotest.property.arbitrary.bind
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.next
import io.kotest.property.arbitrary.string
import io.kotest.property.checkAll
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
Expand Down Expand Up @@ -157,5 +163,10 @@ class FakeRelay(val sent: MutableList<Event> = mutableListOf()) : Relay() {
unsubscriptions.add(subscription)
}

override val relayMessages: Flow<RelayMessage>
get() = sent.map { event ->
EventMessage(arbSubscriptionId.next(), event)
}.asFlow()

override val allEvents: Flow<Event> = sent.asFlow()
}

0 comments on commit a09a960

Please sign in to comment.