Skip to content

Commit

Permalink
custom ws subscriptions for vara (#334)
Browse files Browse the repository at this point in the history
  • Loading branch information
a10zn8 authored Nov 3, 2023
1 parent 66b9b00 commit 160e5ae
Show file tree
Hide file tree
Showing 14 changed files with 155 additions and 49 deletions.
18 changes: 13 additions & 5 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.emeraldpay.dshackle.rpc
import com.google.protobuf.ByteString
import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.api.proto.BlockchainOuterClass.NativeSubscribeReplyItem
import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.SilentException
Expand Down Expand Up @@ -57,8 +56,11 @@ open class NativeSubscribe(

fun start(request: BlockchainOuterClass.NativeSubscribeRequest): Publisher<ResponseHolder> {
val chain = Chain.byId(request.chainValue)
if (chain.type != BlockchainType.ETHEREUM) {
return Mono.error(UnsupportedOperationException("Native subscribe is not supported for ${chain.chainCode}"))

val multistream = getUpstream(chain)

if (!multistream.getSubscriptionTopics().contains(request.method)) {
return Mono.error(UnsupportedOperationException("subscribe ${request.method} is not supported for ${chain.chainCode}"))
}

val nonce = request.nonce.takeIf { it != 0L }
Expand All @@ -69,7 +71,7 @@ open class NativeSubscribe(
* If not possible - performs subscription logic on the current instance
* @see EthereumLikeMultistream.tryProxySubscribe
*/
val publisher = getUpstream(chain).tryProxySubscribe(matcher, request) ?: run {
val publisher = multistream.tryProxySubscribe(matcher, request) ?: run {
val method = request.method
val params: Any? = request.payload?.takeIf { !it.isEmpty }?.let {
objectMapper.readValue(it.newInput(), Map::class.java)
Expand Down Expand Up @@ -110,7 +112,13 @@ open class NativeSubscribe(
if (holder.response is NativeSubscribeReplyItem) {
return holder.response
}
val result = objectMapper.writeValueAsBytes(holder.response)

val result = if (holder.response is ByteArray) {
holder.response
} else {
objectMapper.writeValueAsBytes(holder.response)
}

val builder = NativeSubscribeReplyItem.newBuilder()
.setPayload(ByteString.copyFrom(result))

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import kotlin.concurrent.withLock
abstract class Multistream(
val chain: Chain,
val caches: Caches,
) : Upstream, Lifecycle, HasEgressSubscription {
) : Upstream, Lifecycle {

abstract fun getUpstreams(): MutableList<out Upstream>
abstract fun addUpstreamInternal(u: Upstream)
Expand Down Expand Up @@ -498,4 +498,6 @@ abstract class Multistream(
return map.values.min()
}
}

abstract fun getEgressSubscription(): EgressSubscription
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ import io.emeraldpay.etherjar.rpc.RpcException
*/
class DefaultPolkadotMethods : CallMethods {

companion object {
val subs = setOf(
Pair("subscribe_newHead", "unsubscribe_newHead"),
Pair("chain_subscribeAllHeads", "chain_unsubscribeAllHeads"),
Pair("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads"),
Pair("chain_subscribeNewHeads", "chain_unsubscribeNewHeads"),
Pair("chain_subscribeRuntimeVersion", "chain_unsubscribeNewHeads"),
Pair("chain_subscribeRuntimeVersion", "chain_unsubscribeRuntimeVersion"),
)
}

private val all = setOf(
"author_pendingExtrinsics",
"author_removeExtrinsic",
Expand All @@ -37,16 +48,6 @@ class DefaultPolkadotMethods : CallMethods {
"chain_getHead",
"chain_getHeader",
"chain_getRuntimeVersion",
"chain_subscribeAllHeads",
"chain_subscribeFinalizedHeads",
"chain_subscribeNewHeads",
"chain_subscribeRuntimeVersion",
"chain_unsubscribeAllHeads",
"chain_unsubscribeFinalisedHeads",
"chain_unsubscribeFinalizedHeads",
"chain_unsubscribeNewHead",
"chain_unsubscribeNewHeads",
"chain_unsubscribeRuntimeVersion",
"childstate_getKeys",
"childstate_getKeysPaged",
"childstate_getKeysPagedAt",
Expand Down Expand Up @@ -87,9 +88,7 @@ class DefaultPolkadotMethods : CallMethods {
"state_queryStorageAt",
"state_traceBlock",
"state_trieMigrationStatus",
"subscribe_newHead",
"system_chain",
"unsubscribe_newHead",
)

private val add = setOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.Capability
import io.emeraldpay.dshackle.upstream.EgressSubscription
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetector
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.AggregatedPendingTxes
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumWsIngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.NoPendingTxes
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource
import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder
Expand Down Expand Up @@ -91,4 +93,8 @@ object EthereumChainSpecific : ChainSpecific {
}
return upstream.getIngressSubscription().getAvailableTopics().plus(subs).toSet().toList()
}

override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription {
return EthereumWsIngressSubscription(ws)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@ package io.emeraldpay.dshackle.upstream.ethereum.subscribe
import com.github.benmanes.caffeine.cache.Caffeine
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.etherjar.domain.TransactionId
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import java.time.Duration

class AggregatedPendingTxes(
private val sources: List<PendingTxesSource>,
) : PendingTxesSource {

companion object {
private val log = LoggerFactory.getLogger(AggregatedPendingTxes::class.java)
}

private val track = Caffeine.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.maximumSize(10_000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.EgressSubscription
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetector
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
import io.emeraldpay.dshackle.upstream.polkadot.PolkadotChainSpecific
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific
Expand Down Expand Up @@ -52,6 +54,8 @@ interface ChainSpecific {
fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector?

fun subscriptionTopics(upstream: GenericUpstream): List<String>

fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription
}

object ChainSpecificRegistry {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription
import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead
import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator
import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool
import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptionsImpl
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumWsIngressSubscription
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.emeraldpay.dshackle.upstream.generic.ChainSpecific
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcWsClient
Expand All @@ -32,7 +30,7 @@ class GenericWsConnector(
private val pool: WsConnectionPool
private val reader: JsonRpcReader
private val head: GenericWsHead
private val subscriptions: EthereumIngressSubscription
private val subscriptions: IngressSubscription
private val liveness: HeadLivenessValidator
init {
pool = wsFactory.create(upstream)
Expand All @@ -49,7 +47,7 @@ class GenericWsConnector(
chainSpecific,
)
liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId())
subscriptions = EthereumWsIngressSubscription(wsSubscriptions)
subscriptions = chainSpecific.makeIngressSubscription(wsSubscriptions)
}

override fun hasLiveSubscriptionHead(): Flux<Boolean> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright (c) 2022 EmeraldPay, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.upstream.generic.subscribe

import io.emeraldpay.dshackle.commons.DurableFlux
import io.emeraldpay.dshackle.commons.SharedFluxHolder
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.SubscriptionConnect
import reactor.core.publisher.Flux
import java.time.Duration

abstract class GenericPersistentConnect : SubscriptionConnect<Any> {

private val connectionSource = DurableFlux
.newBuilder()
.using(::createConnection)
.backoffOnError(Duration.ofMillis(100), 1.5, Duration.ofSeconds(60))
.build()
private val holder = SharedFluxHolder(
connectionSource::connect,
)

override fun connect(matcher: Selector.Matcher): Flux<Any> {
return holder.get()
}

abstract fun createConnection(): Flux<Any>
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import io.emeraldpay.dshackle.foundation.ChainOptions.Options
import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.EgressSubscription
import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetector
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.NoopCachingReader
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder
import io.emeraldpay.dshackle.upstream.generic.ChainSpecific
import io.emeraldpay.dshackle.upstream.generic.GenericUpstream
Expand Down Expand Up @@ -73,7 +74,7 @@ object PolkadotChainSpecific : ChainSpecific {
}

override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription {
return { _ -> EmptyEgressSubscription }
return { ms -> PolkadotEgressSubscription(ms, headScheduler) }
}

override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder {
Expand All @@ -96,6 +97,10 @@ object PolkadotChainSpecific : ChainSpecific {
override fun subscriptionTopics(upstream: GenericUpstream): List<String> {
return emptyList()
}

override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription {
return PolkadotIngressSubscription(ws)
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.emeraldpay.dshackle.upstream.polkadot

import io.emeraldpay.dshackle.upstream.EgressSubscription
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Selector.Matcher
import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods
import io.emeraldpay.dshackle.upstream.generic.GenericUpstream
import reactor.core.publisher.Flux
import reactor.core.scheduler.Scheduler

class PolkadotEgressSubscription(
val upstream: Multistream,
val scheduler: Scheduler,
) : EgressSubscription {
override fun getAvailableTopics(): List<String> {
return DefaultPolkadotMethods.subs.map { it.first }
}

override fun subscribe(topic: String, params: Any?, matcher: Matcher): Flux<ByteArray> {
val up = upstream.getUpstreams().shuffled().first { matcher.matches(it) } as GenericUpstream
return up.getIngressSubscription().get<ByteArray>(topic)?.connect(matcher) ?: Flux.empty()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.emeraldpay.dshackle.upstream.polkadot

import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.SubscriptionConnect
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
import io.emeraldpay.dshackle.upstream.generic.subscribe.GenericPersistentConnect
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.time.Duration

class PolkadotIngressSubscription(val conn: WsSubscriptions) : IngressSubscription {
override fun getAvailableTopics(): List<String> {
return emptyList() // not used now
}

@Suppress("UNCHECKED_CAST")
override fun <T> get(topic: String): SubscriptionConnect<T> {
return PolkaConnect(conn, topic) as SubscriptionConnect<T>
}
}

class PolkaConnect(
val conn: WsSubscriptions,
val topic: String,
) : GenericPersistentConnect() {

@Suppress("UNCHECKED_CAST")
override fun createConnection(): Flux<Any> {
return conn.subscribe(JsonRpcRequest(topic, listOf()))
.data
.timeout(Duration.ofSeconds(60), Mono.empty())
.onErrorResume { Mono.empty() } as Flux<Any>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@ import io.emeraldpay.dshackle.upstream.CachingReader
import io.emeraldpay.dshackle.upstream.EgressSubscription
import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.IngressSubscription
import io.emeraldpay.dshackle.upstream.LabelsDetector
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.NoIngressSubscription
import io.emeraldpay.dshackle.upstream.NoopCachingReader
import io.emeraldpay.dshackle.upstream.Upstream
import io.emeraldpay.dshackle.upstream.UpstreamValidator
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions
import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder
import io.emeraldpay.dshackle.upstream.generic.ChainSpecific
import io.emeraldpay.dshackle.upstream.generic.GenericUpstream
Expand Down Expand Up @@ -91,6 +94,10 @@ object StarknetChainSpecific : ChainSpecific {
override fun subscriptionTopics(upstream: GenericUpstream): List<String> {
return emptyList()
}

override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription {
return NoIngressSubscription()
}
}

@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Loading

0 comments on commit 160e5ae

Please sign in to comment.