From 2e80da58df53f969c4d4dfa666b729f427bfd43a Mon Sep 17 00:00:00 2001 From: a10zn8 Date: Mon, 13 Nov 2023 17:01:58 +0300 Subject: [PATCH] solana support (#339) --- .../kotlin/chainsconfig.codegen.gradle.kts | 1 + emerald-grpc | 2 +- .../io/emeraldpay/dshackle/BlockchainType.kt | 2 +- foundation/src/main/resources/chains.yaml | 23 ++++ .../io/emeraldpay/dshackle/data/BlockId.kt | 6 + .../dshackle/rpc/NativeSubscribe.kt | 7 +- .../dshackle/rpc/SubscribeNodeStatus.kt | 1 - .../configure/GenericUpstreamCreator.kt | 1 - .../dshackle/upstream/CallTargetsHolder.kt | 2 + .../dshackle/upstream/DefaultSolanaMethods.kt | 110 +++++++++++++++ .../dshackle/upstream/IngressSubscription.kt | 2 +- .../dshackle/upstream/Multistream.kt | 2 +- .../upstream/NoIngressSubscription.kt | 2 +- .../emeraldpay/dshackle/upstream/Upstream.kt | 1 - .../upstream/bitcoin/BitcoinRpcUpstream.kt | 4 - .../upstream/calls/DefaultPolkadotMethods.kt | 11 +- .../ethereum/EthereumChainSpecific.kt | 15 +-- .../upstream/ethereum/GenericWsHead.kt | 2 +- .../EthereumDshackleIngressSubscription.kt | 56 -------- .../EthereumWsIngressSubscription.kt | 2 +- .../upstream/generic/AbstractChainSpecific.kt | 79 +++++++++++ .../upstream/generic/ChainSpecific.kt | 9 +- .../generic/GenericEgressSubscription.kt | 26 ++++ .../dshackle/upstream/generic/GenericHead.kt | 5 +- .../GenericIngressSubscription.kt} | 23 +++- .../upstream/generic/GenericMultistream.kt | 28 +++- .../upstream/generic/GenericUpstream.kt | 5 - .../generic/connectors/GenericRpcConnector.kt | 15 ++- .../upstream/grpc/BitcoinGrpcUpstream.kt | 4 - .../upstream/grpc/GenericGrpcUpstream.kt | 12 +- .../polkadot/PolkadotChainSpecific.kt | 22 +-- .../polkadot/PolkadotEgressSubscription.kt | 23 ---- .../upstream/rpcclient/JsonRpcResponse.kt | 5 +- .../upstream/solana/SolanaChainSpecific.kt | 127 ++++++++++++++++++ .../starknet/StarknetChainSpecific.kt | 74 +--------- .../dshackle/rpc/NativeSubscribeSpec.groovy | 16 ++- .../dshackle/test/GenericUpstreamMock.groovy | 1 - .../dshackle/upstream/FilteredApisSpec.groovy | 1 - .../solana/SolanaChainSpecificTest.kt | 35 +++++ 39 files changed, 513 insertions(+), 249 deletions(-) create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultSolanaMethods.kt delete mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumDshackleIngressSubscription.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt rename src/main/kotlin/io/emeraldpay/dshackle/upstream/{polkadot/PolkadotIngressSubscription.kt => generic/GenericIngressSubscription.kt} (52%) delete mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotEgressSubscription.kt create mode 100644 src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt diff --git a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts index bdb8aab79..f616ed3a0 100644 --- a/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts +++ b/buildSrc/src/main/kotlin/chainsconfig.codegen.gradle.kts @@ -117,6 +117,7 @@ open class CodeGen(private val config: ChainsConfig) { "bitcoin" -> "BlockchainType.BITCOIN" "starknet" -> "BlockchainType.STARKNET" "polkadot" -> "BlockchainType.POLKADOT" + "solana" -> "BlockchainType.SOLANA" else -> throw IllegalArgumentException("unknown blockchain type $type") } } diff --git a/emerald-grpc b/emerald-grpc index 3a98f870f..76d95358e 160000 --- a/emerald-grpc +++ b/emerald-grpc @@ -1 +1 @@ -Subproject commit 3a98f870ff6098c62297f59761514e1e3c0b7783 +Subproject commit 76d95358e71835569e10efc2ee68babf2fdddbf2 diff --git a/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt b/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt index 3882ffece..5c20c906e 100644 --- a/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt +++ b/foundation/src/main/kotlin/io/emeraldpay/dshackle/BlockchainType.kt @@ -1,5 +1,5 @@ package io.emeraldpay.dshackle enum class BlockchainType { - UNKNOWN, BITCOIN, ETHEREUM, STARKNET, POLKADOT; + UNKNOWN, BITCOIN, ETHEREUM, STARKNET, POLKADOT, SOLANA; } diff --git a/foundation/src/main/resources/chains.yaml b/foundation/src/main/resources/chains.yaml index 33fe1387d..1044e91b3 100644 --- a/foundation/src/main/resources/chains.yaml +++ b/foundation/src/main/resources/chains.yaml @@ -664,3 +664,26 @@ chain-settings: short-names: [ vara-testnet ] chain-id: 0x0 grpcId: 10036 + - id: solana + label: solana + type: solana + settings: + expected-block-time: 1s + options: + validate-peers: false + lags: + syncing: 20 + lagging: 10 + chains: + - id: Mainnet + priority: 1 + code: SOLANA_MAINNET + short-names: [ solana ] + chain-id: 0x0 + grpcId: 1028 + - id: Testnet + priority: 1 + code: SOLANA_TESTMET + short-names: [ solana-testnet ] + chain-id: 0x0 + grpcId: 10037 diff --git a/src/main/kotlin/io/emeraldpay/dshackle/data/BlockId.kt b/src/main/kotlin/io/emeraldpay/dshackle/data/BlockId.kt index d9d0ebf9e..8031a4340 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/data/BlockId.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/data/BlockId.kt @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.data import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.etherjar.domain.BlockHash import org.bouncycastle.util.encoders.Hex +import java.util.Base64 class BlockId( value: ByteArray, @@ -55,5 +56,10 @@ class BlockId( val bytes = Hex.decode(even) return BlockId(bytes) } + + fun fromBase64(id: String): BlockId { + val bytes = Base64.getDecoder().decode(id) + return BlockId(bytes) + } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt index f5676c94c..ee024bbde 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/NativeSubscribe.kt @@ -74,7 +74,12 @@ open class NativeSubscribe( val publisher = multistream.tryProxySubscribe(matcher, request) ?: run { val method = request.method val params: Any? = request.payload?.takeIf { !it.isEmpty }?.let { - objectMapper.readValue(it.newInput(), Map::class.java) + val raw = it.toStringUtf8() + if (raw.startsWith("{")) { + objectMapper.readValue(it.newInput(), Map::class.java) + } else { + objectMapper.readValue(it.newInput(), List::class.java) + } } subscribe(chain, method, params, matcher) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt index a7d92a27c..b81c99fa8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt @@ -164,7 +164,6 @@ class SubscribeNodeStatus( .build() }, ) - .addAllSupportedSubscriptions(up.getSubscriptionTopics()) .addAllSupportedMethods(up.getMethods().getSupportedMethods()) (up as? GrpcUpstream)?.let { it.getBuildInfo().version?.let { version -> diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt index bb0c03762..9d13939a3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/configure/GenericUpstreamCreator.kt @@ -85,7 +85,6 @@ open class GenericUpstreamCreator( connectorFactory, cs::validator, cs::labelDetector, - cs::subscriptionTopics, ) upstream.start() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt index de90454ec..3f2e51af1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CallTargetsHolder.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream import io.emeraldpay.dshackle.BlockchainType.BITCOIN import io.emeraldpay.dshackle.BlockchainType.ETHEREUM import io.emeraldpay.dshackle.BlockchainType.POLKADOT +import io.emeraldpay.dshackle.BlockchainType.SOLANA import io.emeraldpay.dshackle.BlockchainType.STARKNET import io.emeraldpay.dshackle.BlockchainType.UNKNOWN import io.emeraldpay.dshackle.Chain @@ -27,6 +28,7 @@ class CallTargetsHolder { ETHEREUM -> DefaultEthereumMethods(chain) STARKNET -> DefaultStarknetMethods(chain) POLKADOT -> DefaultPolkadotMethods() + SOLANA -> DefaultSolanaMethods() UNKNOWN -> throw IllegalArgumentException("unknown chain") } callTargets[chain] = created diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultSolanaMethods.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultSolanaMethods.kt new file mode 100644 index 000000000..bf7c504ea --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultSolanaMethods.kt @@ -0,0 +1,110 @@ +package io.emeraldpay.dshackle.upstream + +import io.emeraldpay.dshackle.quorum.AlwaysQuorum +import io.emeraldpay.dshackle.quorum.BroadcastQuorum +import io.emeraldpay.dshackle.quorum.CallQuorum +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.etherjar.rpc.RpcException + +class DefaultSolanaMethods : CallMethods { + companion object { + val subs = setOf( + "accountSubscribe" to "accountUnsubscribe", + "blockSubscribe" to "blockUnsubscribe", + "logsSubscribe" to "logsUnsubscribe", + "programSubscribe" to "programUnsubscribe", + "signatureSubscribe" to "signatureUnsubscribe", + "slotSubscribe" to "slotUnsubscribe", + ) + } + + private val all = setOf( + "getAccountInfo", + "getBalance", + "getBlock", + "getBlockHeight", + "getBlockProduction", + "getBlockCommitment", + "getBlocks", + "getBlocksWithLimit", + "getBlockTime", + "getClusterNodes", + "getEpochInfo", + "getEpochSchedule", + "getFeeForMessage", + "getFirstAvailableBlock", + "getGenesisHash", + "getHealth", + "getHighestSnapshotSlot", + "getInflationGovernor", + "getInflationRate", + "getInflationReward", + "getLargestAccounts", + "getLatestBlockhash", + "getLeaderSchedule", + "getMaxRetransmitSlot", + "getMaxShredInsertSlot", + "getMinimumBalanceForRentExemption", + "getMultipleAccounts", + "getProgramAccounts", + "getRecentPerformanceSamples", + "getRecentPrioritizationFees", + "getSignaturesForAddress", + "getSignatureStatuses", + "getSlot", + "getSlotLeader", + "getSlotLeaders", + "getStakeActivation", + "getStakeMinimumDelegation", + "getSupply", + "getTokenAccountBalance", + "getTokenAccountsByDelegate", + "getTokenAccountsByOwner", + "getTokenLargestAccounts", + "getTokenSupply", + "getTransaction", + "getTransactionCount", + "getVersion", + "getVoteAccounts", + "isBlockhashValid", + "minimumLedgerSlot", + "requestAirdrop", + "simulateTransaction", + ) + + private val add = setOf( + "sendTransaction", + ) + + private val allowedMethods: Set = all + add + + override fun createQuorumFor(method: String): CallQuorum { + return when { + add.contains(method) -> BroadcastQuorum() + all.contains(method) -> AlwaysQuorum() + else -> AlwaysQuorum() + } + } + + override fun isCallable(method: String): Boolean { + return allowedMethods.contains(method) + } + + override fun isHardcoded(method: String): Boolean { + return false + } + + override fun executeHardcoded(method: String): ByteArray { + throw RpcException(-32601, "Method not found") + } + + override fun getGroupMethods(groupName: String): Set = + when (groupName) { + "default" -> getSupportedMethods() + else -> emptyList() + }.toSet() + + override fun getSupportedMethods(): Set { + return allowedMethods.toSortedSet() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/IngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/IngressSubscription.kt index d597587d6..462b986b6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/IngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/IngressSubscription.kt @@ -21,5 +21,5 @@ package io.emeraldpay.dshackle.upstream interface IngressSubscription { fun getAvailableTopics(): List - fun get(topic: String): SubscriptionConnect? + fun get(topic: String, params: Any?): SubscriptionConnect? } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 72a9dc01d..867cbd675 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -94,7 +94,7 @@ abstract class Multistream( .multicast() .directBestEffort() - override fun getSubscriptionTopics(): List { + fun getSubscriptionTopics(): List { return getEgressSubscription().getAvailableTopics() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/NoIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/NoIngressSubscription.kt index 9fe184a7f..bafc53e72 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/NoIngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/NoIngressSubscription.kt @@ -25,7 +25,7 @@ open class NoIngressSubscription : IngressSubscription { return listOf() } - override fun get(topic: String): SubscriptionConnect? { + override fun get(topic: String, params: Any?): SubscriptionConnect? { return null } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index 4976303c4..0a7d1fb21 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -41,7 +41,6 @@ interface Upstream : Lifecycle { fun getLag(): Long? fun getLabels(): Collection fun getMethods(): CallMethods - fun getSubscriptionTopics(): List fun getId(): String fun getCapabilities(): Set fun isGrpc(): Boolean diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt index 18c05c95c..fa0b4922b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt @@ -59,10 +59,6 @@ open class BitcoinRpcUpstream( return directApi } - override fun getSubscriptionTopics(): List { - return listOf() - } - override fun getLabels(): Collection { return listOf(UpstreamsConfig.Labels()) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt index 467206b63..cdb736669 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/calls/DefaultPolkadotMethods.kt @@ -29,12 +29,11 @@ class DefaultPolkadotMethods : CallMethods { companion object { val subs = setOf( - Pair("subscribe_newHead", "unsubscribe_newHead"), - Pair("chain_subscribeAllHeads", "chain_unsubscribeAllHeads"), - Pair("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads"), - Pair("chain_subscribeNewHeads", "chain_unsubscribeNewHeads"), - Pair("chain_subscribeRuntimeVersion", "chain_unsubscribeNewHeads"), - Pair("chain_subscribeRuntimeVersion", "chain_unsubscribeRuntimeVersion"), + "subscribe_newHead" to "unsubscribe_newHead", + "chain_subscribeAllHeads" to "chain_unsubscribeAllHeads", + "chain_subscribeFinalizedHeads" to "chain_unsubscribeFinalizedHeads", + "chain_subscribeNewHeads" to "chain_unsubscribeNewHeads", + "chain_subscribeRuntimeVersion" to "chain_unsubscribeRuntimeVersion", ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 105874ed2..648d9dd76 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -7,7 +7,6 @@ import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.foundation.ChainOptions.Options import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.CachingReader -import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.EgressSubscription import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription @@ -23,15 +22,15 @@ import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumLabelsDetector import io.emeraldpay.dshackle.upstream.ethereum.subscribe.EthereumWsIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.subscribe.NoPendingTxes import io.emeraldpay.dshackle.upstream.ethereum.subscribe.PendingTxesSource +import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder -import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import org.springframework.cloud.sleuth.Tracer import reactor.core.publisher.Mono import reactor.core.scheduler.Scheduler -object EthereumChainSpecific : ChainSpecific { +object EthereumChainSpecific : AbstractPollChainSpecific() { override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { return BlockContainer.fromEthereumJson(data, upstreamId) } @@ -58,6 +57,7 @@ object EthereumChainSpecific : ChainSpecific { val pendingTxes: PendingTxesSource = (ms.getAll()) .filter { it is GenericUpstream } .map { it as GenericUpstream } + .filter { it.getIngressSubscription() is EthereumIngressSubscription } .mapNotNull { (it.getIngressSubscription() as EthereumIngressSubscription).getPendingTxes() }.let { @@ -90,15 +90,6 @@ object EthereumChainSpecific : ChainSpecific { return EthereumLabelsDetector(reader, chain) } - override fun subscriptionTopics(upstream: GenericUpstream): List { - val subs = if (upstream.getCapabilities().contains(Capability.WS_HEAD)) { - listOf(EthereumEgressSubscription.METHOD_NEW_HEADS, EthereumEgressSubscription.METHOD_LOGS) - } else { - listOf() - } - return upstream.getIngressSubscription().getAvailableTopics().plus(subs).toSet().toList() - } - override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { return EthereumWsIngressSubscription(ws) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index 7615cca4d..05694f241 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -93,7 +93,7 @@ class GenericWsHead( } .timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads"))) .onErrorResume { - log.error("Error getting heads for $upstreamId - ${it.message}") + log.error("Error getting heads for $upstreamId", it) subscribed = false unsubscribe() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumDshackleIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumDshackleIngressSubscription.kt deleted file mode 100644 index f68871994..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumDshackleIngressSubscription.kt +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright (c) 2022 EmeraldPay, Inc - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.emeraldpay.dshackle.upstream.ethereum.subscribe - -import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.ReactorBlockchainGrpc -import io.emeraldpay.dshackle.Chain -import io.emeraldpay.dshackle.upstream.IngressSubscription -import io.emeraldpay.dshackle.upstream.SubscriptionConnect -import io.emeraldpay.dshackle.upstream.ethereum.EthereumEgressSubscription -import io.emeraldpay.dshackle.upstream.ethereum.EthereumIngressSubscription -import org.slf4j.LoggerFactory - -class EthereumDshackleIngressSubscription( - private val blockchain: Chain, - private val conn: ReactorBlockchainGrpc.ReactorBlockchainStub, -) : IngressSubscription, EthereumIngressSubscription { - - companion object { - private val log = LoggerFactory.getLogger(EthereumDshackleIngressSubscription::class.java) - } - - private val pendingTxes = DshacklePendingTxesSource(blockchain, conn) - - override fun getAvailableTopics(): List { - return listOf(EthereumEgressSubscription.METHOD_PENDING_TXES) - } - - override fun get(topic: String): SubscriptionConnect? { - if (topic == EthereumEgressSubscription.METHOD_PENDING_TXES) { - return pendingTxes as SubscriptionConnect - } - return null - } - - fun update(conf: BlockchainOuterClass.DescribeChain) { - pendingTxes.update(conf) - } - - override fun getPendingTxes(): PendingTxesSource? { - return pendingTxes - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumWsIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumWsIngressSubscription.kt index 29400c287..9f2a6f5cd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumWsIngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/EthereumWsIngressSubscription.kt @@ -32,7 +32,7 @@ class EthereumWsIngressSubscription( } @Suppress("UNCHECKED_CAST") - override fun get(topic: String): SubscriptionConnect? { + override fun get(topic: String, params: Any?): SubscriptionConnect? { if (topic == EthereumEgressSubscription.METHOD_PENDING_TXES) { return pendingTxes as SubscriptionConnect } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt new file mode 100644 index 000000000..840e24776 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/AbstractChainSpecific.kt @@ -0,0 +1,79 @@ +package io.emeraldpay.dshackle.upstream.generic + +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.cache.Caches +import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.foundation.ChainOptions.Options +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.CachingReader +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.IngressSubscription +import io.emeraldpay.dshackle.upstream.LabelsDetector +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.NoIngressSubscription +import io.emeraldpay.dshackle.upstream.NoopCachingReader +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamValidator +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.calls.CallSelector +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import org.springframework.cloud.sleuth.Tracer +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler + +abstract class AbstractChainSpecific : ChainSpecific { + + override fun localReaderBuilder( + cachingReader: CachingReader, + methods: CallMethods, + head: Head, + ): Mono { + return Mono.just(LocalReader(methods)) + } + + override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder { + return { _, _, _ -> NoopCachingReader } + } + + override fun validator( + chain: Chain, + upstream: Upstream, + options: Options, + config: ChainConfig, + ): UpstreamValidator? { + return null + } + + override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? { + return null + } + + override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { + return NoIngressSubscription() + } + + override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { + return { _ -> EmptyEgressSubscription } + } + + override fun callSelector(caches: Caches): CallSelector? { + return null + } +} + +abstract class AbstractPollChainSpecific : AbstractChainSpecific() { + + override fun getLatestBlock(api: JsonRpcReader, upstreamId: String): Mono { + return api.read(latestBlockRequest()).map { + parseBlock(it.getResult(), upstreamId) + } + } + + abstract fun latestBlockRequest(): JsonRpcRequest + + abstract fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index 00abe5826..c613c88a5 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -3,6 +3,7 @@ package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.BlockchainType.BITCOIN import io.emeraldpay.dshackle.BlockchainType.ETHEREUM import io.emeraldpay.dshackle.BlockchainType.POLKADOT +import io.emeraldpay.dshackle.BlockchainType.SOLANA import io.emeraldpay.dshackle.BlockchainType.STARKNET import io.emeraldpay.dshackle.BlockchainType.UNKNOWN import io.emeraldpay.dshackle.Chain @@ -25,6 +26,7 @@ import io.emeraldpay.dshackle.upstream.ethereum.EthereumChainSpecific import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.polkadot.PolkadotChainSpecific import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.solana.SolanaChainSpecific import io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific import org.apache.commons.collections4.Factory import org.springframework.cloud.sleuth.Tracer @@ -36,11 +38,9 @@ typealias LocalReaderBuilder = (CachingReader, CallMethods, Head) -> Mono) -> CachingReader interface ChainSpecific { - fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer - fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer - fun latestBlockRequest(): JsonRpcRequest + fun getLatestBlock(api: JsonRpcReader, upstreamId: String): Mono fun listenNewHeadsRequest(): JsonRpcRequest @@ -56,8 +56,6 @@ interface ChainSpecific { fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? - fun subscriptionTopics(upstream: GenericUpstream): List - fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription fun callSelector(caches: Caches): CallSelector? @@ -71,6 +69,7 @@ object ChainSpecificRegistry { ETHEREUM -> EthereumChainSpecific STARKNET -> StarknetChainSpecific POLKADOT -> PolkadotChainSpecific + SOLANA -> SolanaChainSpecific BITCOIN -> throw IllegalArgumentException("bitcoin should use custom streams implementation") UNKNOWN -> throw IllegalArgumentException("unknown chain") } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt new file mode 100644 index 000000000..de1363e25 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericEgressSubscription.kt @@ -0,0 +1,26 @@ +package io.emeraldpay.dshackle.upstream.generic + +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.Selector.Matcher +import reactor.core.publisher.Flux +import reactor.core.scheduler.Scheduler + +class GenericEgressSubscription( + val multistream: Multistream, + val scheduler: Scheduler, + val methods: List, +) : EgressSubscription { + override fun getAvailableTopics(): List { + return methods + } + + override fun subscribe(topic: String, params: Any?, matcher: Matcher): Flux { + val up = multistream.getUpstreams() + .filter { it.isAvailable() } + .shuffled() + .first { matcher.matches(it) } as GenericUpstream + + return up.getIngressSubscription().get(topic, params)?.connect(matcher) ?: Flux.empty() + } +} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt index cb2ef1ea8..ecf4e33e0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericHead.kt @@ -34,12 +34,9 @@ open class GenericHead( ) : Head, AbstractHead(forkChoice, headScheduler, blockValidator, 60_000, upstreamId) { fun getLatestBlock(api: JsonRpcReader): Mono { - return api.read(chainSpecific.latestBlockRequest()) + return chainSpecific.getLatestBlock(api, upstreamId) .subscribeOn(headScheduler) .timeout(Defaults.timeout, Mono.error(Exception("Block data not received"))) - .map { - chainSpecific.parseBlock(it.getResult(), upstreamId) - } .onErrorResume { err -> log.error("Failed to fetch latest block: ${err.message} $upstreamId", err) Mono.empty() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotIngressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt similarity index 52% rename from src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotIngressSubscription.kt rename to src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt index bcccde21c..d3abbde99 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotIngressSubscription.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericIngressSubscription.kt @@ -1,4 +1,4 @@ -package io.emeraldpay.dshackle.upstream.polkadot +package io.emeraldpay.dshackle.upstream.generic import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.SubscriptionConnect @@ -8,28 +8,39 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.time.Duration +import java.util.concurrent.ConcurrentHashMap -class PolkadotIngressSubscription(val conn: WsSubscriptions) : IngressSubscription { +class GenericIngressSubscription(val conn: WsSubscriptions) : IngressSubscription { override fun getAvailableTopics(): List { return emptyList() // not used now } + private val holders = ConcurrentHashMap, SubscriptionConnect>() + @Suppress("UNCHECKED_CAST") - override fun get(topic: String): SubscriptionConnect { - return PolkaConnect(conn, topic) as SubscriptionConnect + override fun get(topic: String, params: Any?): SubscriptionConnect { + return holders.computeIfAbsent(topic to params, { key -> GenericSubscriptionConnect(conn, key.first, key.second) }) as SubscriptionConnect } } -class PolkaConnect( +class GenericSubscriptionConnect( val conn: WsSubscriptions, val topic: String, + val params: Any?, ) : GenericPersistentConnect() { @Suppress("UNCHECKED_CAST") override fun createConnection(): Flux { - return conn.subscribe(JsonRpcRequest(topic, listOf())) + return conn.subscribe(JsonRpcRequest(topic, getParams(params))) .data .timeout(Duration.ofSeconds(60), Mono.empty()) .onErrorResume { Mono.empty() } as Flux } + + private fun getParams(params: Any?): List { + if (params == null) { + return listOf() + } + return params as List + } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt index 31173f9cd..4d359a12f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt @@ -20,7 +20,9 @@ import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.CachingReader import io.emeraldpay.dshackle.upstream.DistanceExtractor import io.emeraldpay.dshackle.upstream.DynamicMergedHead @@ -35,8 +37,11 @@ import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.Selector.Matcher import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.calls.CallSelector +import io.emeraldpay.dshackle.upstream.ethereum.EnrichedMergedHead +import io.emeraldpay.dshackle.upstream.ethereum.EthereumCachingReader import io.emeraldpay.dshackle.upstream.forkchoice.PriorityForkChoice import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstream +import io.emeraldpay.etherjar.domain.BlockHash import org.springframework.util.ConcurrentReferenceHashMap import org.springframework.util.ConcurrentReferenceHashMap.ReferenceType.WEAK import reactor.core.publisher.Flux @@ -146,9 +151,26 @@ open class GenericMultistream( return head } - override fun getEnrichedHead(mather: Matcher): Head { - return getHead() - } + override fun getEnrichedHead(mather: Selector.Matcher): Head = + filteredHeads.computeIfAbsent(mather.describeInternal().intern()) { _ -> + upstreams.filter { mather.matches(it) } + .apply { + log.debug("Found $size upstreams matching [${mather.describeInternal()}]") + }.let { + val selected = it.map { source -> source.getHead() } + EnrichedMergedHead( + selected, + getHead(), + headScheduler, + object : + Reader { + override fun read(key: BlockHash): Mono { + return (cachingReader as EthereumCachingReader).blocksByHashAsCont().read(key).map { res -> res.data } + } + }, + ) + } + } override fun getLabels(): Collection { return upstreams.flatMap { it.getLabels() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index 97e5c0cd2..71698a098 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -40,7 +40,6 @@ open class GenericUpstream( connectorFactory: ConnectorFactory, validatorBuilder: UpstreamValidatorBuilder, labelsDetectorBuilder: LabelsDetectorBuilder, - private val subscriptionTopics: (GenericUpstream) -> List, ) : DefaultUpstream(id, hash, null, UpstreamAvailability.OK, options, role, targets, node, chainConfig), Lifecycle { private val validator: UpstreamValidator? = validatorBuilder(chain, this, getOptions(), chainConfig) @@ -64,10 +63,6 @@ open class GenericUpstream( return node?.let { listOf(it.labels) } ?: emptyList() } - override fun getSubscriptionTopics(): List { - return subscriptionTopics(this) - } - // outdated, looks like applicable only for bitcoin and our ws_head trick override fun getCapabilities(): Set { return if (hasLiveSubscriptionHead.get()) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 30001a93d..d29c80858 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -10,11 +10,12 @@ import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.IngressSubscription import io.emeraldpay.dshackle.upstream.Lifecycle import io.emeraldpay.dshackle.upstream.MergedHead +import io.emeraldpay.dshackle.upstream.NoIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.GenericWsHead import io.emeraldpay.dshackle.upstream.ethereum.HeadLivenessValidator -import io.emeraldpay.dshackle.upstream.ethereum.NoEthereumIngressSubscription import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPool import io.emeraldpay.dshackle.upstream.ethereum.WsConnectionPoolFactory +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptionsImpl import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice @@ -41,10 +42,12 @@ class GenericRpcConnector( headScheduler: Scheduler, headLivenessScheduler: Scheduler, expectedBlockTime: Duration, - chainSpecific: ChainSpecific, + private val chainSpecific: ChainSpecific, ) : GenericConnector, CachesEnabled { private val id = upstream.getId() private val pool: WsConnectionPool? + private val wsSubs: WsSubscriptions? + private val ingressSubscription: IngressSubscription? private val head: Head private val liveness: HeadLivenessValidator @@ -58,6 +61,8 @@ class GenericRpcConnector( init { pool = wsFactory?.create(upstream) + wsSubs = pool?.let { WsSubscriptionsImpl(it) } + ingressSubscription = wsSubs?.let { chainSpecific.makeIngressSubscription(it) } head = when (connectorType) { RPC_ONLY -> { @@ -83,7 +88,7 @@ class GenericRpcConnector( AlwaysForkChoice(), blockValidator, getIngressReader(), - WsSubscriptionsImpl(pool!!), + wsSubs!!, wsConnectionResubscribeScheduler, headScheduler, upstream, @@ -108,7 +113,7 @@ class GenericRpcConnector( AlwaysForkChoice(), blockValidator, getIngressReader(), - WsSubscriptionsImpl(pool!!), + wsSubs!!, wsConnectionResubscribeScheduler, headScheduler, upstream, @@ -152,7 +157,7 @@ class GenericRpcConnector( } override fun getIngressSubscription(): IngressSubscription { - return NoEthereumIngressSubscription.DEFAULT + return ingressSubscription ?: NoIngressSubscription() } override fun getHead(): Head { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt index 81db028e6..471c65766 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt @@ -89,10 +89,6 @@ class BitcoinGrpcUpstream( block } - override fun getSubscriptionTopics(): List { - return listOf() - } - private val reloadBlock: Function> = Function { existingBlock -> // head comes without transaction data // need to download transactions for the block diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt index 56f291836..94dc08fb9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GenericGrpcUpstream.kt @@ -102,9 +102,6 @@ open class GenericGrpcUpstream( private val defaultReader: JsonRpcReader = client.getReader() - // private val ethereumSubscriptions = EthereumDshackleIngressSubscription(chain, remote) - private var subscriptionTopics = listOf() - override fun start() { } @@ -115,10 +112,6 @@ open class GenericGrpcUpstream( override fun stop() { } - override fun getSubscriptionTopics(): List { - return subscriptionTopics - } - override fun getBuildInfo(): BuildInfo { return buildInfo } @@ -130,11 +123,8 @@ open class GenericGrpcUpstream( val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also { capabilities = newCapabilities } - val subsChanged = (conf.supportedSubscriptionsList != subscriptionTopics).also { - subscriptionTopics = conf.supportedSubscriptionsList - } conf.status?.let { status -> onStatus(status) } - return buildInfoChanged || upstreamStatusChanged || subsChanged + return buildInfoChanged || upstreamStatusChanged } override fun getQuorumByLabel(): QuorumForLabels { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index 3cfac6224..48d05ae9e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -4,7 +4,6 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonProperty import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId @@ -20,11 +19,12 @@ import io.emeraldpay.dshackle.upstream.NoopCachingReader import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.UpstreamValidator import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.calls.CallSelector +import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions +import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder -import io.emeraldpay.dshackle.upstream.generic.ChainSpecific -import io.emeraldpay.dshackle.upstream.generic.GenericUpstream +import io.emeraldpay.dshackle.upstream.generic.GenericEgressSubscription +import io.emeraldpay.dshackle.upstream.generic.GenericIngressSubscription import io.emeraldpay.dshackle.upstream.generic.LocalReader import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import org.springframework.cloud.sleuth.Tracer @@ -33,7 +33,7 @@ import reactor.core.scheduler.Scheduler import java.math.BigInteger import java.time.Instant -object PolkadotChainSpecific : ChainSpecific { +object PolkadotChainSpecific : AbstractPollChainSpecific() { override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { val response = Global.objectMapper.readValue(data, PolkadotBlockResponse::class.java) @@ -79,7 +79,7 @@ object PolkadotChainSpecific : ChainSpecific { } override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { - return { ms -> PolkadotEgressSubscription(ms, headScheduler) } + return { ms -> GenericEgressSubscription(ms, headScheduler, DefaultPolkadotMethods.subs.map { it.first }) } } override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder { @@ -99,16 +99,8 @@ object PolkadotChainSpecific : ChainSpecific { return null } - override fun subscriptionTopics(upstream: GenericUpstream): List { - return emptyList() - } - override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { - return PolkadotIngressSubscription(ws) - } - - override fun callSelector(caches: Caches): CallSelector? { - return null + return GenericIngressSubscription(ws) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotEgressSubscription.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotEgressSubscription.kt deleted file mode 100644 index c4e8f3df1..000000000 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotEgressSubscription.kt +++ /dev/null @@ -1,23 +0,0 @@ -package io.emeraldpay.dshackle.upstream.polkadot - -import io.emeraldpay.dshackle.upstream.EgressSubscription -import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.Selector.Matcher -import io.emeraldpay.dshackle.upstream.calls.DefaultPolkadotMethods -import io.emeraldpay.dshackle.upstream.generic.GenericUpstream -import reactor.core.publisher.Flux -import reactor.core.scheduler.Scheduler - -class PolkadotEgressSubscription( - val upstream: Multistream, - val scheduler: Scheduler, -) : EgressSubscription { - override fun getAvailableTopics(): List { - return DefaultPolkadotMethods.subs.map { it.first } - } - - override fun subscribe(topic: String, params: Any?, matcher: Matcher): Flux { - val up = upstream.getUpstreams().shuffled().first { matcher.matches(it) } as GenericUpstream - return up.getIngressSubscription().get(topic)?.connect(matcher) ?: Flux.empty() - } -} diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcResponse.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcResponse.kt index 92e57ae8b..f6dd01c0f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcResponse.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/rpcclient/JsonRpcResponse.kt @@ -97,7 +97,10 @@ class JsonRpcResponse( if (str.startsWith("\"") && str.endsWith("\"")) { return str.substring(1, str.length - 1) } - throw IllegalStateException("Not as JS string") + if (str.all { it.isDigit() }) { + return str + } + throw IllegalStateException("Not as JS string - [$str]") } fun requireResult(): Mono { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt new file mode 100644 index 000000000..2e879e695 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecific.kt @@ -0,0 +1,127 @@ +package io.emeraldpay.dshackle.upstream.solana + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty +import io.emeraldpay.dshackle.Chain +import io.emeraldpay.dshackle.Global +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.data.BlockId +import io.emeraldpay.dshackle.reader.JsonRpcReader +import io.emeraldpay.dshackle.upstream.DefaultSolanaMethods +import io.emeraldpay.dshackle.upstream.EgressSubscription +import io.emeraldpay.dshackle.upstream.IngressSubscription +import io.emeraldpay.dshackle.upstream.LabelsDetector +import io.emeraldpay.dshackle.upstream.Multistream +import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions +import io.emeraldpay.dshackle.upstream.generic.AbstractChainSpecific +import io.emeraldpay.dshackle.upstream.generic.GenericEgressSubscription +import io.emeraldpay.dshackle.upstream.generic.GenericIngressSubscription +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import reactor.core.publisher.Mono +import reactor.core.scheduler.Scheduler +import java.math.BigInteger +import java.time.Instant + +object SolanaChainSpecific : AbstractChainSpecific() { + + override fun getLatestBlock(api: JsonRpcReader, upstreamId: String): Mono { + return api.read(JsonRpcRequest("getLatestBlockhash", listOf())).flatMap { + val response = Global.objectMapper.readValue(it.getResult(), SolanaLatest::class.java) + api.read( + JsonRpcRequest( + "getBlock", + listOf( + response.context.slot, + mapOf( + "showRewards" to false, + "transactionDetails" to "none", + "maxSupportedTransactionVersion" to 0, + ), + ), + ), + ).map { + val raw = it.getResult() + val block = Global.objectMapper.readValue(it.getResult(), SolanaBlock::class.java) + makeBlock(raw, block, upstreamId) + } + } + } + + override fun parseHeader(data: ByteArray, upstreamId: String): BlockContainer { + val res = Global.objectMapper.readValue(data, SolanaWrapper::class.java) + return makeBlock(data, res.value.block, upstreamId) + } + + private fun makeBlock(raw: ByteArray, block: SolanaBlock, upstreamId: String): BlockContainer { + return BlockContainer( + height = block.height, + hash = BlockId.fromBase64(block.hash), + difficulty = BigInteger.ZERO, + timestamp = Instant.ofEpochMilli(block.timestamp), + full = false, + json = raw, + parsed = block, + transactions = emptyList(), + upstreamId = upstreamId, + parentHash = BlockId.fromBase64(block.parent), + ) + } + + override fun listenNewHeadsRequest(): JsonRpcRequest { + return JsonRpcRequest( + "blockSubscribe", + listOf( + "all", + mapOf( + "showRewards" to false, + "transactionDetails" to "none", + ), + ), + ) + } + + override fun unsubscribeNewHeadsRequest(subId: String): JsonRpcRequest { + return JsonRpcRequest("blockUnsubscribe", listOf(subId)) + } + + override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? { + return null + } + + override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { + return GenericIngressSubscription(ws) + } + + override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { + return { ms -> GenericEgressSubscription(ms, headScheduler, DefaultSolanaMethods.subs.map { it.first }) } + } +} + +@JsonIgnoreProperties(ignoreUnknown = true) +data class SolanaLatest( + @JsonProperty("context") var context: SolanaContext, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class SolanaWrapper( + @JsonProperty("context") var context: SolanaContext, + @JsonProperty("value") var value: SolanaResult, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class SolanaContext( + @JsonProperty("slot") var slot: Long, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class SolanaResult( + @JsonProperty("block") var block: SolanaBlock, +) + +@JsonIgnoreProperties(ignoreUnknown = true) +data class SolanaBlock( + @JsonProperty("blockHeight") var height: Long, + @JsonProperty("blockTime") var timestamp: Long, + @JsonProperty("blockhash") var hash: String, + @JsonProperty("previousBlockhash") var parent: String, +) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index d0a9020ba..6195a5352 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -2,40 +2,15 @@ package io.emeraldpay.dshackle.upstream.starknet import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.annotation.JsonProperty -import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.cache.Caches -import io.emeraldpay.dshackle.config.ChainsConfig.ChainConfig import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId -import io.emeraldpay.dshackle.foundation.ChainOptions.Options -import io.emeraldpay.dshackle.reader.JsonRpcReader -import io.emeraldpay.dshackle.upstream.CachingReader -import io.emeraldpay.dshackle.upstream.EgressSubscription -import io.emeraldpay.dshackle.upstream.EmptyEgressSubscription -import io.emeraldpay.dshackle.upstream.Head -import io.emeraldpay.dshackle.upstream.IngressSubscription -import io.emeraldpay.dshackle.upstream.LabelsDetector -import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.NoIngressSubscription -import io.emeraldpay.dshackle.upstream.NoopCachingReader -import io.emeraldpay.dshackle.upstream.Upstream -import io.emeraldpay.dshackle.upstream.UpstreamValidator -import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.calls.CallSelector -import io.emeraldpay.dshackle.upstream.ethereum.WsSubscriptions -import io.emeraldpay.dshackle.upstream.generic.CachingReaderBuilder -import io.emeraldpay.dshackle.upstream.generic.ChainSpecific -import io.emeraldpay.dshackle.upstream.generic.GenericUpstream -import io.emeraldpay.dshackle.upstream.generic.LocalReader +import io.emeraldpay.dshackle.upstream.generic.AbstractPollChainSpecific import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import org.springframework.cloud.sleuth.Tracer -import reactor.core.publisher.Mono -import reactor.core.scheduler.Scheduler import java.math.BigInteger import java.time.Instant -object StarknetChainSpecific : ChainSpecific { +object StarknetChainSpecific : AbstractPollChainSpecific() { override fun parseBlock(data: ByteArray, upstreamId: String): BlockContainer { val block = Global.objectMapper.readValue(data, StarknetBlock::class.java) @@ -57,9 +32,6 @@ object StarknetChainSpecific : ChainSpecific { throw NotImplementedError() } - override fun latestBlockRequest(): JsonRpcRequest = - JsonRpcRequest("starknet_getBlockWithTxHashes", listOf("latest")) - override fun listenNewHeadsRequest(): JsonRpcRequest { throw NotImplementedError() } @@ -68,46 +40,8 @@ object StarknetChainSpecific : ChainSpecific { throw NotImplementedError() } - override fun localReaderBuilder( - cachingReader: CachingReader, - methods: CallMethods, - head: Head, - ): Mono { - return Mono.just(LocalReader(methods)) - } - - override fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription { - return { _ -> EmptyEgressSubscription } - } - - override fun makeCachingReaderBuilder(tracer: Tracer): CachingReaderBuilder { - return { _, _, _ -> NoopCachingReader } - } - - override fun validator( - chain: Chain, - upstream: Upstream, - options: Options, - config: ChainConfig, - ): UpstreamValidator? { - return null - } - - override fun labelDetector(chain: Chain, reader: JsonRpcReader): LabelsDetector? { - return null - } - - override fun subscriptionTopics(upstream: GenericUpstream): List { - return emptyList() - } - - override fun makeIngressSubscription(ws: WsSubscriptions): IngressSubscription { - return NoIngressSubscription() - } - - override fun callSelector(caches: Caches): CallSelector? { - return null - } + override fun latestBlockRequest(): JsonRpcRequest = + JsonRpcRequest("starknet_getBlockWithTxHashes", listOf("latest")) } @JsonIgnoreProperties(ignoreUnknown = true) diff --git a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy index aa5059a90..2a2023b5a 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/rpc/NativeSubscribeSpec.groovy @@ -41,11 +41,11 @@ class NativeSubscribeSpec extends Specification { def subscribe = Mock(EthereumEgressSubscription) { 1 * it.subscribe("newHeads", null, _ as Selector.AnyLabelMatcher) >> Flux.just("{}") + 1 * it.getAvailableTopics() >> ["newHeads"] } def up = Mock(GenericMultistream) { 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> null - 1 * it.getEgressSubscription() >> subscribe - 1 * it.getSubscriptionTopics() >> ["newHeads"] + 2 * it.getEgressSubscription() >> subscribe } def nativeSubscribe = new NativeSubscribe(new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, up), signer) @@ -81,11 +81,11 @@ class NativeSubscribeSpec extends Specification { println("ok: $ok") ok }, _ as Selector.AnyLabelMatcher) >> Flux.just("{}") + 1 * it.getAvailableTopics() >> ["logs"] } def up = Mock(GenericMultistream) { 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> null - 1 * it.getEgressSubscription() >> subscribe - 1 * it.getSubscriptionTopics() >> ["logs"] + 2 * it.getEgressSubscription() >> subscribe } def nativeSubscribe = new NativeSubscribe(new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, up), signer) @@ -106,10 +106,14 @@ class NativeSubscribeSpec extends Specification { .setChainValue(Chain.ETHEREUM__MAINNET.id) .setMethod("newHeads") .build() + + def subscribe = Mock(EthereumEgressSubscription) { + 1 * it.getAvailableTopics() >> ["newHeads"] + } + def up = Mock(GenericMultistream) { 1 * it.tryProxySubscribe(_ as Selector.AnyLabelMatcher, call) >> Flux.just("{}") - 0 * it.getEgressSubscription() - 1 * it.getSubscriptionTopics() >> ["newHeads"] + 1 * it.getEgressSubscription() >> subscribe } def nativeSubscribe = new NativeSubscribe(new MultistreamHolderMock(Chain.ETHEREUM__MAINNET, up), signer) diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy index 02d660724..56da1932c 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/GenericUpstreamMock.groovy @@ -74,7 +74,6 @@ class GenericUpstreamMock extends GenericUpstream { new ConnectorFactoryMock(api, new EthereumHeadMock()), io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&validator, io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&labelDetector, - io.emeraldpay.dshackle.upstream.starknet.StarknetChainSpecific.INSTANCE.&subscriptionTopics, ) this.ethereumHeadMock = this.getHead() as EthereumHeadMock setLag(0) diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy index 0828cbe7f..1a2cad449 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy @@ -79,7 +79,6 @@ class FilteredApisSpec extends Specification { connectorFactory, cs.&validator, cs.&labelDetector, - cs.&subscriptionTopics, ) } def matcher = new Selector.LabelMatcher("test", ["foo"]) diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt new file mode 100644 index 000000000..30387a81f --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/solana/SolanaChainSpecificTest.kt @@ -0,0 +1,35 @@ +package io.emeraldpay.dshackle.upstream.solana + +import io.emeraldpay.dshackle.data.BlockId +import org.assertj.core.api.Assertions +import org.junit.jupiter.api.Test + +val example = """{ + "context": { + "slot": 112301554 + }, + "value": { + "slot": 112301554, + "block": { + "previousBlockhash": "GJp125YAN4ufCSUvZJVdCyWQJ7RPWMmwxoyUQySydZA", + "blockhash": "6ojMHjctdqfB55JDpEpqfHnP96fiaHEcvzEQ2NNcxzHP", + "parentSlot": 112301553, + "blockTime": 1639926816, + "blockHeight": 101210751 + }, + "err": null + } + } +""".trimIndent() +class SolanaChainSpecificTest { + + @Test + fun parseBlock() { + val result = SolanaChainSpecific.parseHeader(example.toByteArray(), "1") + + Assertions.assertThat(result.height).isEqualTo(101210751) + Assertions.assertThat(result.hash).isEqualTo(BlockId.fromBase64("6ojMHjctdqfB55JDpEpqfHnP96fiaHEcvzEQ2NNcxzHP")) + Assertions.assertThat(result.upstreamId).isEqualTo("1") + Assertions.assertThat(result.parentHash).isEqualTo(BlockId.fromBase64("GJp125YAN4ufCSUvZJVdCyWQJ7RPWMmwxoyUQySydZA")) + } +}