diff --git a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt index 78c55f931..d79ee7d9f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt @@ -23,5 +23,6 @@ class Defaults { companion object { val timeout: Duration = Duration.ofSeconds(60) val timeoutInternal: Duration = timeout.dividedBy(4) + val retryConnection: Duration = Duration.ofSeconds(10) } } \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/data/BlockContainer.kt b/src/main/kotlin/io/emeraldpay/dshackle/data/BlockContainer.kt index 5f4de9be5..86c2cf010 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/data/BlockContainer.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/data/BlockContainer.kt @@ -55,7 +55,7 @@ class BlockContainer( } @JvmStatic - fun from(raw: ByteArray): BlockContainer { + fun fromEthereumJson(raw: ByteArray): BlockContainer { val block = Global.objectMapper.readValue(raw, BlockJson::class.java) return from(block, raw) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 54f3673ff..0ce260cda 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -16,19 +16,16 @@ */ package io.emeraldpay.dshackle.startup -import com.fasterxml.jackson.databind.ObjectMapper import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.FileResolver -import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.cache.CachesFactory import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream +import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream import io.emeraldpay.dshackle.upstream.calls.CallMethods import io.emeraldpay.dshackle.upstream.calls.ManagedCallMethods import io.emeraldpay.dshackle.upstream.ethereum.EthereumRpcUpstream -import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstream import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsFactory import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstreams import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcHttpClient @@ -146,7 +143,7 @@ open class ConfiguredUpstreams( } val methods = buildMethods(config, chain) - val upstream = BitcoinUpstream(config.id + val upstream = BitcoinRpcUpstream(config.id ?: "bitcoin-${seq.getAndIncrement()}", chain, directApi, options, config.role, QuorumForLabels.QuorumItem(1, config.labels), diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt index 66519ba78..c1d5b2da1 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt @@ -58,6 +58,20 @@ class QuorumForLabels() { return Collections.unmodifiableList(nodes) } + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is QuorumForLabels) return false + + if (nodes != other.nodes) return false + + return true + } + + override fun hashCode(): Int { + return nodes.hashCode() + } + + /** * Details for a single element (upstream, node or aggregation) */ @@ -67,6 +81,24 @@ class QuorumForLabels() { return QuorumItem(0, UpstreamsConfig.Labels()) } } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (other !is QuorumItem) return false + + if (quorum != other.quorum) return false + if (labels != other.labels) return false + + return true + } + + override fun hashCode(): Int { + var result = quorum + result = 31 * result + labels.hashCode() + return result + } + + } } \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt index 2a53ea298..db8c8c171 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/CurrentMultistreamHolder.kt @@ -16,14 +16,12 @@ */ package io.emeraldpay.dshackle.upstream -import com.fasterxml.jackson.databind.ObjectMapper import io.emeraldpay.dshackle.BlockchainType -import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.cache.CachesEnabled import io.emeraldpay.dshackle.cache.CachesFactory -import io.emeraldpay.dshackle.quorum.QuorumReaderFactory import io.emeraldpay.dshackle.startup.UpstreamChange import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream +import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods import io.emeraldpay.dshackle.upstream.calls.CallMethods diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 7947740ea..f443c2f95 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -16,6 +16,7 @@ */ package io.emeraldpay.dshackle.upstream +import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.calls.CallMethods @@ -46,6 +47,14 @@ abstract class DefaultUpstream( return getStatus() == UpstreamAvailability.OK } + fun onStatus(value: BlockchainOuterClass.ChainStatus) { + val available = value.availability + val quorum = value.quorum + setStatus( + if (available != null) UpstreamAvailability.fromGrpc(available.number) else UpstreamAvailability.UNAVAILABLE + ) + } + override fun getStatus(): UpstreamAvailability { return status.get().status } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt new file mode 100644 index 000000000..19e692261 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinRpcUpstream.kt @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.bitcoin + +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.reader.Reader +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.* +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import io.emeraldpay.grpc.Chain +import org.slf4j.LoggerFactory +import org.springframework.context.Lifecycle +import reactor.core.Disposable + +open class BitcoinRpcUpstream( + id: String, + chain: Chain, + private val directApi: Reader, + options: UpstreamsConfig.Options, + role: UpstreamsConfig.UpstreamRole, + node: QuorumForLabels.QuorumItem, + callMethods: CallMethods +) : BitcoinUpstream(id, chain, options, role, callMethods, node), Lifecycle { + + companion object { + private val log = LoggerFactory.getLogger(BitcoinRpcUpstream::class.java) + } + + private val head: Head = createHead() + private var validatorSubscription: Disposable? = null + + private fun createHead(): Head { + return BitcoinRpcHead( + directApi, + ExtractBlock() + ) + } + + override fun getHead(): Head { + return head + } + + override fun getApi(): Reader { + return directApi + } + + override fun getLabels(): Collection { + return listOf(UpstreamsConfig.Labels()) + } + + override fun cast(selfType: Class): T { + if (!selfType.isAssignableFrom(this.javaClass)) { + throw ClassCastException("Cannot cast ${this.javaClass} to $selfType") + } + return this as T + } + + override fun isRunning(): Boolean { + var runningAny = validatorSubscription != null + if (head is Lifecycle) { + runningAny = runningAny || head.isRunning + } + return runningAny + } + + override fun start() { + log.info("Configured for ${chain.chainName}") + if (head is Lifecycle) { + if (!head.isRunning) { + head.start() + } + } + + validatorSubscription?.dispose() + + if (getOptions().disableValidation != null && getOptions().disableValidation!!) { + this.setLag(0) + this.setStatus(UpstreamAvailability.OK) + } else { + val validator = BitcoinUpstreamValidator(directApi, getOptions()) + validatorSubscription = validator.start() + .subscribe(this::setStatus) + } + } + + override fun stop() { + if (head is Lifecycle) { + head.stop() + } + validatorSubscription?.dispose() + } + +} \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt index c3cf873e0..229292466 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinUpstream.kt @@ -15,97 +15,30 @@ */ package io.emeraldpay.dshackle.upstream.bitcoin -import com.fasterxml.jackson.databind.ObjectMapper import io.emeraldpay.dshackle.config.UpstreamsConfig -import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.startup.QuorumForLabels -import io.emeraldpay.dshackle.upstream.* +import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods import io.emeraldpay.grpc.Chain import org.slf4j.LoggerFactory -import org.springframework.context.Lifecycle -import reactor.core.Disposable -import reactor.core.publisher.Mono -open class BitcoinUpstream( +abstract class BitcoinUpstream( id: String, val chain: Chain, - private val directApi: Reader, options: UpstreamsConfig.Options, role: UpstreamsConfig.UpstreamRole, - node: QuorumForLabels.QuorumItem, - callMethods: CallMethods -) : DefaultUpstream(id, options, role, callMethods, node), Lifecycle { + callMethods: CallMethods, + node: QuorumForLabels.QuorumItem +) : DefaultUpstream(id, options, role, callMethods, node) { + + constructor(id: String, + chain: Chain, + options: UpstreamsConfig.Options, + role: UpstreamsConfig.UpstreamRole) : this(id, chain, options, role, DefaultBitcoinMethods(), QuorumForLabels.QuorumItem.empty()) companion object { private val log = LoggerFactory.getLogger(BitcoinUpstream::class.java) } - private val head: Head = createHead() - private var validatorSubscription: Disposable? = null - - private fun createHead(): Head { - return BitcoinRpcHead( - directApi, - ExtractBlock() - ) - } - - override fun getHead(): Head { - return head - } - - override fun getApi(): Reader { - return directApi - } - - override fun getLabels(): Collection { - return listOf(UpstreamsConfig.Labels()) - } - - override fun cast(selfType: Class): T { - if (!selfType.isAssignableFrom(this.javaClass)) { - throw ClassCastException("Cannot cast ${this.javaClass} to $selfType") - } - return this as T - } - - override fun isRunning(): Boolean { - var runningAny = validatorSubscription != null - if (head is Lifecycle) { - runningAny = runningAny || head.isRunning - } - runningAny = runningAny - return runningAny - } - - override fun start() { - log.info("Configured for ${chain.chainName}") - if (head is Lifecycle) { - if (!head.isRunning) { - head.start() - } - } - - validatorSubscription?.dispose() - - if (getOptions().disableValidation != null && getOptions().disableValidation!!) { - this.setLag(0) - this.setStatus(UpstreamAvailability.OK) - } else { - val validator = BitcoinUpstreamValidator(directApi, getOptions()) - validatorSubscription = validator.start() - .subscribe(this::setStatus) - } - } - - override fun stop() { - if (head is Lifecycle) { - head.stop() - } - validatorSubscription?.dispose() - } - } \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcHead.kt index 20c7a35e0..5896d9e9a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumRpcHead.kt @@ -16,14 +16,12 @@ */ package io.emeraldpay.dshackle.upstream.ethereum -import com.fasterxml.jackson.databind.ObjectMapper import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.infinitape.etherjar.hex.HexQuantity -import io.infinitape.etherjar.rpc.Commands import org.slf4j.LoggerFactory import org.springframework.context.Lifecycle import org.springframework.scheduling.concurrent.CustomizableThreadFactory @@ -71,7 +69,7 @@ class EthereumRpcHead( .timeout(Defaults.timeout, Mono.error(Exception("Block data not received"))) } .map { - BlockContainer.from(it.getResult()) + BlockContainer.fromEthereumJson(it.getResult()) } .onErrorContinue { err, _ -> log.debug("RPC error ${err.message}") diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt index eaf4e5aaf..50e4931c9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsFactory.kt @@ -92,7 +92,7 @@ class EthereumWsFactory( } } .flatMap(JsonRpcResponse::requireResult) - .map { BlockContainer.from(it) } + .map { BlockContainer.fromEthereumJson(it) } }.repeatWhenEmpty { n -> Repeat.times(5) .exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(500)) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt new file mode 100644 index 000000000..44a936d4d --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt @@ -0,0 +1,132 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.grpc + +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.api.proto.ReactorBlockchainGrpc +import io.emeraldpay.dshackle.Defaults +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.data.BlockId +import io.emeraldpay.dshackle.reader.Reader +import io.emeraldpay.dshackle.upstream.Head +import io.emeraldpay.dshackle.upstream.Selector +import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream +import io.emeraldpay.dshackle.upstream.bitcoin.ExtractBlock +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse +import io.emeraldpay.grpc.Chain +import io.infinitape.etherjar.rpc.RpcException +import org.reactivestreams.Publisher +import org.slf4j.LoggerFactory +import org.springframework.context.Lifecycle +import reactor.core.publisher.Mono +import java.math.BigInteger +import java.time.Instant +import java.util.concurrent.TimeoutException +import java.util.function.Function + +class BitcoinGrpcUpstream( + private val parentId: String, + chain: Chain, + private val remote: ReactorBlockchainGrpc.ReactorBlockchainStub, + private val client: JsonRpcGrpcClient +) : BitcoinUpstream( + "$parentId/${chain.chainCode}", + chain, + UpstreamsConfig.Options.getDefaults(), + UpstreamsConfig.UpstreamRole.STANDARD +), GrpcUpstream, Lifecycle { + + companion object { + private val log = LoggerFactory.getLogger(BitcoinGrpcUpstream::class.java) + } + + private val extractBlock = ExtractBlock() + private val defaultReader: Reader = client.forSelector(Selector.empty) + private val blockConverter: Function = Function { value -> + val block = BlockContainer( + value.height, + BlockId.from(value.blockId), + BigInteger(1, value.weight.toByteArray()), + Instant.ofEpochMilli(value.timestamp), + false, + null, + null + ) + block + } + private val reloadBlock: Function> = Function { existingBlock -> + // head comes without transaction data + // need to download transactions for the block + defaultReader.read(JsonRpcRequest("getblock", listOf(existingBlock.hash.toHex()))) + .flatMap(JsonRpcResponse::requireResult) + .map(extractBlock::extract) + .timeout(timeout, Mono.error(TimeoutException("Timeout from upstream"))) + .doOnError { t -> + setStatus(UpstreamAvailability.UNAVAILABLE) + val msg = "Failed to download block data for chain $chain on $parentId" + if (t is RpcException || t is TimeoutException) { + log.warn("$msg. Message: ${t.message}") + } else { + log.error(msg, t) + } + } + } + private val upstreamStatus = GrpcUpstreamStatus() + private val grpcHead = GrpcHead(chain, this, blockConverter, reloadBlock) + var timeout = Defaults.timeout + + override fun getHead(): Head { + return grpcHead + } + + override fun getApi(): Reader { + return defaultReader + } + + override fun getLabels(): Collection { + return upstreamStatus.getLabels() + } + + override fun cast(selfType: Class): T { + if (!selfType.isAssignableFrom(this.javaClass)) { + throw ClassCastException("Cannot cast ${this.javaClass} to $selfType") + } + return this as T + } + + override fun isRunning(): Boolean { + return grpcHead.isRunning + } + + override fun start() { + grpcHead.start(remote) + } + + override fun stop() { + grpcHead.stop() + } + + override fun update(conf: BlockchainOuterClass.DescribeChain) { + upstreamStatus.update(conf) + conf.status?.let { status -> onStatus(status) } + } + +} \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt index 568e5218f..2cbb7956c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt @@ -16,9 +16,7 @@ */ package io.emeraldpay.dshackle.upstream.grpc -import com.salesforce.reactorgrpc.GrpcRetry import io.emeraldpay.api.proto.BlockchainOuterClass -import io.emeraldpay.api.proto.Common import io.emeraldpay.api.proto.ReactorBlockchainGrpc import io.emeraldpay.dshackle.Defaults import io.emeraldpay.dshackle.config.UpstreamsConfig @@ -28,7 +26,6 @@ import io.emeraldpay.dshackle.reader.Reader import io.emeraldpay.dshackle.startup.QuorumForLabels import io.emeraldpay.dshackle.upstream.* import io.emeraldpay.dshackle.upstream.calls.CallMethods -import io.emeraldpay.dshackle.upstream.calls.DirectCallMethods import io.emeraldpay.dshackle.upstream.ethereum.* import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest @@ -36,167 +33,107 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.grpc.Chain import io.infinitape.etherjar.domain.BlockHash import io.infinitape.etherjar.rpc.* +import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.context.Lifecycle -import reactor.core.Disposable -import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.core.publisher.toMono import java.math.BigInteger -import java.time.Duration import java.time.Instant -import java.util.* import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.AtomicReference import java.util.function.Function -import kotlin.collections.ArrayList open class EthereumGrpcUpstream( private val parentId: String, private val chain: Chain, - private val blockchainStub: ReactorBlockchainGrpc.ReactorBlockchainStub, + private val remote: ReactorBlockchainGrpc.ReactorBlockchainStub, private val client: JsonRpcGrpcClient ) : EthereumUpstream( "$parentId/${chain.chainCode}", UpstreamsConfig.Options.getDefaults(), UpstreamsConfig.UpstreamRole.STANDARD, null, null -), Lifecycle { - - private var allLabels: Collection = ArrayList() - private val log = LoggerFactory.getLogger(EthereumGrpcUpstream::class.java) +), GrpcUpstream, Lifecycle { + + private val blockConverter: Function = Function { value -> + val block = BlockContainer( + value.height, + BlockId.from(BlockHash.from("0x" + value.blockId)), + BigInteger(1, value.weight.toByteArray()), + Instant.ofEpochMilli(value.timestamp), + false, + null, + null + ) + block + } - private val nodes = AtomicReference(QuorumForLabels()) - private val head = DefaultEthereumHead() - private var targets: CallMethods? = null - private var headSubscription: Disposable? = null + private val reloadBlock: Function> = Function { existingBlock -> + // head comes without transaction data + // need to download transactions for the block + defaultReader.read(JsonRpcRequest("eth_getBlockByHash", listOf(existingBlock.hash.toHexWithPrefix(), false))) + .flatMap(JsonRpcResponse::requireResult) + .map { + BlockContainer.fromEthereumJson(it) + } + .timeout(timeout, Mono.error(TimeoutException("Timeout from upstream"))) + .doOnError { t -> + setStatus(UpstreamAvailability.UNAVAILABLE) + val msg = "Failed to download block data for chain $chain on $parentId" + if (t is RpcException || t is TimeoutException) { + log.warn("$msg. Message: ${t.message}") + } else { + log.error(msg, t) + } + } + } - var timeout = Defaults.timeout + private val log = LoggerFactory.getLogger(EthereumGrpcUpstream::class.java) + private val upstreamStatus = GrpcUpstreamStatus() + private val grpcHead = GrpcHead(chain, this, blockConverter, reloadBlock) private val defaultReader: Reader = client.forSelector(Selector.empty) + var timeout = Defaults.timeout override fun start() { - if (this.isRunning) return - val chainRef = Common.Chain.newBuilder() - .setTypeValue(chain.id) - .build() - .toMono() - - val retry: Function, Flux> = Function { - setStatus(UpstreamAvailability.UNAVAILABLE) - blockchainStub.subscribeHead(chainRef) - } - - val flux = blockchainStub.subscribeHead(chainRef) - .compose(GrpcRetry.ManyToMany.retryAfter(retry, Duration.ofSeconds(5))) - observeHead(flux) + grpcHead.start(remote) } override fun isRunning(): Boolean { - return headSubscription != null + return grpcHead.isRunning } override fun stop() { - headSubscription?.dispose() - headSubscription = null + grpcHead.stop() } - - internal fun observeHead(flux: Flux) { - val base = flux.map { value -> - val block = BlockContainer( - value.height, - BlockId.from(BlockHash.from("0x" + value.blockId)), - BigInteger(1, value.weight.toByteArray()), - Instant.ofEpochMilli(value.timestamp), - false, - null, - null - ) - block - }.distinctUntilChanged { - it.hash - }.filter { block -> - val curr = head.getCurrent() - curr == null || curr.difficulty < block.difficulty - }.flatMap { - defaultReader.read(JsonRpcRequest("eth_getBlockByHash", listOf(it.hash.toHexWithPrefix(), false))) - .flatMap(JsonRpcResponse::requireResult) - .map { - BlockContainer.from(it) - } - .timeout(timeout, Mono.error(TimeoutException("Timeout from upstream"))) - .doOnError { t -> - setStatus(UpstreamAvailability.UNAVAILABLE) - val msg = "Failed to download block data for chain $chain on $parentId" - if (t is RpcException || t is TimeoutException) { - log.warn("$msg. Message: ${t.message}") - } else { - log.error(msg, t) - } - } - }.onErrorContinue { err, _ -> - log.error("Head subscription error. ${err.javaClass.name}:${err.message}", err) - }.doOnNext { - setStatus(UpstreamAvailability.OK) - } - - headSubscription = head.follow(base) - } - - fun init(conf: BlockchainOuterClass.DescribeChain) { - targets = DirectCallMethods(conf.supportedMethodsList.toSet()) - val nodes = QuorumForLabels() - val allLabels = ArrayList() - conf.nodesList.forEach { remoteNode -> - val node = QuorumForLabels.QuorumItem(remoteNode.quorum, - remoteNode.labelsList.let { provided -> - val labels = UpstreamsConfig.Labels() - provided.forEach { - labels[it.name] = it.value - } - allLabels.add(labels) - labels - } - ) - nodes.add(node) - } - this.nodes.set(nodes) - this.allLabels = Collections.unmodifiableCollection(allLabels) + override fun update(conf: BlockchainOuterClass.DescribeChain) { + upstreamStatus.update(conf) conf.status?.let { status -> onStatus(status) } } - fun onStatus(value: BlockchainOuterClass.ChainStatus) { - val available = value.availability - val quorum = value.quorum - setStatus( - if (available != null) UpstreamAvailability.fromGrpc(available.number) else UpstreamAvailability.UNAVAILABLE - ) - } - override fun getQuorumByLabel(): QuorumForLabels { - return nodes.get() + return upstreamStatus.getNodes() } // ------------------------------------------------------------------------------------------ override fun getLabels(): Collection { - return allLabels + return upstreamStatus.getLabels() } override fun getMethods(): CallMethods { - return targets ?: throw IllegalStateException("Upstream is not initialized yet") + return upstreamStatus.getCallMethods() } override fun isAvailable(): Boolean { - return super.isAvailable() && head.getCurrent() != null && nodes.get().getAll().any { + return super.isAvailable() && grpcHead.getCurrent() != null && getQuorumByLabel().getAll().any { it.quorum > 0 } } override fun getHead(): Head { - return head + return grpcHead } override fun getApi(): Reader { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt new file mode 100644 index 000000000..524c357e6 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt @@ -0,0 +1,121 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.grpc + +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.api.proto.Common +import io.emeraldpay.api.proto.ReactorBlockchainGrpc +import io.emeraldpay.dshackle.Defaults +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.upstream.AbstractHead +import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.dshackle.upstream.UpstreamAvailability +import io.emeraldpay.grpc.Chain +import org.reactivestreams.Publisher +import org.slf4j.LoggerFactory +import org.springframework.context.Lifecycle +import reactor.core.Disposable +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.util.retry.Retry +import java.time.Duration +import java.util.function.Function + +class GrpcHead( + private val chain: Chain, + private val parent: DefaultUpstream, + /** + * Converted from remote head details to the block container, which could be partial at this point + */ + private val converter: Function, + /** + * Populate block data with all missing details, of any + */ + private val enhancer: Function>? +) : AbstractHead(), Lifecycle { + + companion object { + private val log = LoggerFactory.getLogger(GrpcHead::class.java) + } + + private var headSubscription: Disposable? = null + + /** + * Initiate a new head subscription with connection to the remote + */ + fun start(remote: ReactorBlockchainGrpc.ReactorBlockchainStub) { + if (this.isRunning) { + stop() + } + + val source = Flux.concat( + // first connect immediately + Flux.just(remote), + // following requests do with delay, give it a time to recover + Flux.just(remote).repeat().delayElements(Defaults.retryConnection) + ).flatMap(this::subscribeHead) + + start(source) + } + + fun subscribeHead(client: ReactorBlockchainGrpc.ReactorBlockchainStub): Publisher { + val chainRef = Common.Chain.newBuilder() + .setTypeValue(chain.id) + .build() + return client.subscribeHead(chainRef) + // simple retry on failure, if eventually failed then it supposed to resubscribe later from outer method + .retryWhen(Retry.backoff(4, Duration.ofSeconds(1))) + .onErrorContinue { err, _ -> + log.warn("Disconnected $chain from ${parent.getId()}: ${err.message}") + parent.setStatus(UpstreamAvailability.UNAVAILABLE) + Mono.empty() + } + } + + /** + * Initiate a new head from provided source of head details + */ + fun start(source: Flux) { + var blocks = source.map(converter) + .distinctUntilChanged { + it.hash + }.filter { block -> + val curr = this.getCurrent() + curr == null || curr.difficulty < block.difficulty + } + if (enhancer != null) { + blocks = blocks.flatMap(enhancer) + } + + blocks = blocks.onErrorContinue { err, _ -> + log.error("Head subscription error. ${err.javaClass.name}:${err.message}", err) + } + + headSubscription = super.follow(blocks) + } + + override fun isRunning(): Boolean { + return !(headSubscription?.isDisposed ?: true) + } + + override fun start() { + log.error("Use start with provides source") + } + + override fun stop() { + headSubscription?.dispose() + } +} \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstream.kt new file mode 100644 index 000000000..c07778fd3 --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstream.kt @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.grpc + +import io.emeraldpay.api.proto.BlockchainOuterClass + +interface GrpcUpstream { + + /** + * Update the configuration of the upstream with the new data. + * Called on the first creation, and each time a new state received from upstream + */ + fun update(conf: BlockchainOuterClass.DescribeChain) + +} \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamStatus.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamStatus.kt new file mode 100644 index 000000000..5baff7ade --- /dev/null +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamStatus.kt @@ -0,0 +1,72 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.grpc + +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.calls.CallMethods +import io.emeraldpay.dshackle.upstream.calls.DirectCallMethods +import org.slf4j.LoggerFactory +import java.util.* +import java.util.concurrent.atomic.AtomicReference +import kotlin.collections.ArrayList + +class GrpcUpstreamStatus { + + companion object { + private val log = LoggerFactory.getLogger(GrpcUpstreamStatus::class.java) + } + + private val allLabels: AtomicReference> = AtomicReference(emptyList()) + private val nodes = AtomicReference(QuorumForLabels()) + private var targets: CallMethods? = null + + fun update(conf: BlockchainOuterClass.DescribeChain) { + val updateLabels = ArrayList() + val updateNodes = QuorumForLabels() + + conf.nodesList.forEach { remoteNode -> + val node = QuorumForLabels.QuorumItem(remoteNode.quorum, + remoteNode.labelsList.let { provided -> + val labels = UpstreamsConfig.Labels() + provided.forEach { + labels[it.name] = it.value + } + updateLabels.add(labels) + labels + } + ) + updateNodes.add(node) + } + + this.nodes.set(updateNodes) + this.allLabels.set(Collections.unmodifiableCollection(updateLabels)) + this.targets = DirectCallMethods(conf.supportedMethodsList.toSet()) + } + + fun getLabels(): Collection { + return allLabels.get() + } + + fun getNodes(): QuorumForLabels { + return nodes.get() + } + + fun getCallMethods(): CallMethods { + return targets ?: throw IllegalStateException("Upstream is not initialized yet") + } +} \ No newline at end of file diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt index 16f9371a9..a57da6c01 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt @@ -16,7 +16,6 @@ */ package io.emeraldpay.dshackle.upstream.grpc -import com.fasterxml.jackson.databind.ObjectMapper import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.api.proto.ReactorBlockchainGrpc import io.emeraldpay.dshackle.BlockchainType @@ -25,6 +24,7 @@ import io.emeraldpay.dshackle.FileResolver import io.emeraldpay.dshackle.config.AuthConfig import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.startup.UpstreamChange +import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcGrpcClient import io.emeraldpay.grpc.Chain import io.grpc.ManagedChannelBuilder @@ -54,7 +54,7 @@ class GrpcUpstreams( var timeout = Defaults.timeout private var client: ReactorBlockchainGrpc.ReactorBlockchainStub? = null - private val known = HashMap() + private val known = HashMap() private val lock = ReentrantLock() fun start(): Flux { @@ -115,7 +115,7 @@ class GrpcUpstreams( try { val chain = Chain.byId(chainDetails.chain.number) val up = getOrCreate(chain) - (up.upstream as EthereumGrpcUpstream).init(chainDetails) + (up.upstream as GrpcUpstream).update(chainDetails) up } catch (e: Throwable) { log.warn("Skip unsupported upstream ${chainDetails.chain} on $id: ${e.message}") @@ -157,9 +157,17 @@ class GrpcUpstreams( } fun getOrCreate(chain: Chain): UpstreamChange { - if (BlockchainType.fromBlockchain(chain) != BlockchainType.ETHEREUM) { + val blockchainType = BlockchainType.fromBlockchain(chain) + if (blockchainType == BlockchainType.ETHEREUM) { + return getOrCreateEthereum(chain) + } else if (blockchainType == BlockchainType.BITCOIN) { + return getOrCreateBitcoin(chain) + } else { throw IllegalArgumentException("Unsupported blockchain: $chain") } + } + + fun getOrCreateEthereum(chain: Chain): UpstreamChange { lock.withLock { val current = known[chain] return if (current == null) { @@ -175,7 +183,23 @@ class GrpcUpstreams( } } - fun get(chain: Chain): EthereumGrpcUpstream { + fun getOrCreateBitcoin(chain: Chain): UpstreamChange { + lock.withLock { + val current = known[chain] + return if (current == null) { + val rpcClient = JsonRpcGrpcClient(client!!, chain) + val created = BitcoinGrpcUpstream(id, chain, client!!, rpcClient) + created.timeout = this.timeout + known[chain] = created + created.start() + UpstreamChange(chain, created, UpstreamChange.ChangeType.ADDED) + } else { + UpstreamChange(chain, current, UpstreamChange.ChangeType.REVALIDATED) + } + } + } + + fun get(chain: Chain): DefaultUpstream { return known[chain]!! } } \ No newline at end of file diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy index 7790ae74e..c659303fc 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/MultistreamHolderMock.groovy @@ -21,7 +21,7 @@ import io.emeraldpay.dshackle.BlockchainType import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream -import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream +import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream import io.emeraldpay.dshackle.upstream.calls.DefaultEthereumMethods import io.emeraldpay.dshackle.upstream.Upstream import io.emeraldpay.dshackle.upstream.MultistreamHolder @@ -55,8 +55,8 @@ class MultistreamHolderMock implements MultistreamHolder { } else if (BlockchainType.fromBlockchain(chain) == BlockchainType.BITCOIN) { if (up instanceof BitcoinMultistream) { upstreams[chain] = up - } else if (up instanceof BitcoinUpstream) { - upstreams[chain] = new BitcoinMultistream(chain, [up as BitcoinUpstream], Caches.default()) + } else if (up instanceof BitcoinRpcUpstream) { + upstreams[chain] = new BitcoinMultistream(chain, [up as BitcoinRpcUpstream], Caches.default()) } else { throw new IllegalArgumentException("Unsupported upstream type ${up.class}") } diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/TestingCommons.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/TestingCommons.groovy index 221581049..b4fa9663c 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/TestingCommons.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/TestingCommons.groovy @@ -34,8 +34,12 @@ import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.grpc.Chain import io.infinitape.etherjar.domain.BlockHash import io.infinitape.etherjar.rpc.json.BlockJson +import org.apache.commons.lang3.StringUtils +import java.time.Duration import java.time.Instant +import java.time.temporal.ChronoUnit +import java.time.temporal.TemporalUnit class TestingCommons { @@ -86,9 +90,27 @@ class TestingCommons { setNumber(height) setHash(BlockHash.from("0xc4b01774e426325b50f0c709753ec7cf1f1774439d587dfb91f2a4eeb8179cde")) setTotalDifficulty(BigInteger.ONE) - setTimestamp(Instant.now()) + setTimestamp(predictableTimestamp(height, 14)) } return BlockContainer.from(block) } + static BlockContainer blockForBitcoin(Long height) { + return new BlockContainer( + height, + BlockId.from(StringUtils.leftPad(height.toString(), 64, "0")), + BigInteger.valueOf(height), + predictableTimestamp(height, 60), + false, + null, + null, + [] + ) + } + + static Instant predictableTimestamp(Long x, int stepSeconds) { + //start from 1 Jan 2020 + Instant.ofEpochSecond(1577876400) + .plusSeconds(x * stepSeconds) + } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumFullBlocksReaderSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumFullBlocksReaderSpec.groovy index c68f8bf0b..34b596a22 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumFullBlocksReaderSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumFullBlocksReaderSpec.groovy @@ -24,8 +24,6 @@ import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.data.BlockId import io.emeraldpay.dshackle.data.TxContainer import io.emeraldpay.dshackle.reader.Reader -import io.emeraldpay.dshackle.test.ReaderMock -import io.emeraldpay.dshackle.test.TestingCommons import io.infinitape.etherjar.domain.BlockHash import io.infinitape.etherjar.domain.TransactionId import io.infinitape.etherjar.rpc.json.BlockJson @@ -258,7 +256,7 @@ class EthereumFullBlocksReaderSpec extends Specification { "extraField2": "extraValue2" } ''' - blocks.add(BlockContainer.from(blockJson.bytes)) + blocks.add(BlockContainer.fromEthereumJson(blockJson.bytes)) def tx1 = ''' { diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy index fe125a634..aa26b997a 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy @@ -75,7 +75,8 @@ class EthereumGrpcUpstreamSpec extends Specification { }) def upstream = new EthereumGrpcUpstream("test", chain, client, new JsonRpcGrpcClient(client, chain)) upstream.setLag(0) - upstream.init(BlockchainOuterClass.DescribeChain.newBuilder() + upstream.update(BlockchainOuterClass.DescribeChain.newBuilder() + .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId)) .addAllSupportedMethods(["eth_getBlockByHash"]) .build()) when: @@ -132,7 +133,8 @@ class EthereumGrpcUpstreamSpec extends Specification { }) def upstream = new EthereumGrpcUpstream("test", Chain.ETHEREUM, client, new JsonRpcGrpcClient(client, Chain.ETHEREUM)) upstream.setLag(0) - upstream.init(BlockchainOuterClass.DescribeChain.newBuilder() + upstream.update(BlockchainOuterClass.DescribeChain.newBuilder() + .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId)) .addAllSupportedMethods(["eth_getBlockByHash"]) .build()) when: @@ -193,7 +195,8 @@ class EthereumGrpcUpstreamSpec extends Specification { }) def upstream = new EthereumGrpcUpstream("test", chain, client, new JsonRpcGrpcClient(client, chain)) upstream.setLag(0) - upstream.init(BlockchainOuterClass.DescribeChain.newBuilder() + upstream.update(BlockchainOuterClass.DescribeChain.newBuilder() + .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId)) .addAllSupportedMethods(["eth_getBlockByHash"]) .build()) when: diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/GrpcHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/GrpcHeadSpec.groovy new file mode 100644 index 000000000..4d888bbb4 --- /dev/null +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/GrpcHeadSpec.groovy @@ -0,0 +1,140 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.grpc + +import io.emeraldpay.api.proto.BlockchainGrpc +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.api.proto.Common +import io.emeraldpay.dshackle.data.BlockContainer +import io.emeraldpay.dshackle.test.MockGrpcServer +import io.emeraldpay.dshackle.test.TestingCommons +import io.emeraldpay.dshackle.upstream.DefaultUpstream +import io.emeraldpay.grpc.Chain +import io.grpc.stub.StreamObserver +import reactor.test.StepVerifier +import spock.lang.Specification + +import java.time.Duration +import java.util.function.Function + +class GrpcHeadSpec extends Specification { + + MockGrpcServer mockServer = new MockGrpcServer() + + def "Subscribe to remote"() { + setup: + def client = mockServer.clientForServer(new BlockchainGrpc.BlockchainImplBase() { + @Override + void subscribeHead(Common.Chain request, StreamObserver responseObserver) { + if (request.type.number != Chain.BITCOIN.id) { + responseObserver.onError(new IllegalStateException("Unsupported chain")) + return + } + [10, 11, 12, 14].forEach { height -> + responseObserver.onNext( + BlockchainOuterClass.ChainHead.newBuilder() + .setChain(request.type) + .setHeight(height) + .build() + ) + Thread.sleep(100) + } + responseObserver.onCompleted() + } + }) + def convert = { BlockchainOuterClass.ChainHead head -> + TestingCommons.blockForBitcoin(head.height) + } + def head = new GrpcHead( + Chain.BITCOIN, + Stub(DefaultUpstream), + convert, null + ) + when: + def act = head.getFlux() + .take(3) + head.start(client) + + then: + StepVerifier.create(act) + .expectNext(TestingCommons.blockForBitcoin(10)) + .expectNext(TestingCommons.blockForBitcoin(11)) + .expectNext(TestingCommons.blockForBitcoin(12)) + .expectComplete() + .verify(Duration.ofSeconds(5)) + } + + def "Reconnect to remote on error"() { + setup: + int phase = 0 + def client = mockServer.clientForServer(new BlockchainGrpc.BlockchainImplBase() { + @Override + void subscribeHead(Common.Chain request, StreamObserver responseObserver) { + if (request.type.number != Chain.BITCOIN.id) { + responseObserver.onError(new IllegalStateException("Unsupported chain")) + return + } + phase++ + if (phase == 1) { + responseObserver.onError(new RuntimeException("Phase 1 error")) + } else if (phase == 2) { + [10, 11].forEach { height -> + responseObserver.onNext( + BlockchainOuterClass.ChainHead.newBuilder() + .setChain(request.type) + .setHeight(height) + .build() + ) + Thread.sleep(100) + } + responseObserver.onError(new RuntimeException("Phase 2 error")) + } else if (phase == 3) { + [11, 12, 13].forEach { height -> + responseObserver.onNext( + BlockchainOuterClass.ChainHead.newBuilder() + .setChain(request.type) + .setHeight(height) + .build() + ) + Thread.sleep(100) + } + responseObserver.onCompleted() + } + } + }) + def convert = { BlockchainOuterClass.ChainHead head -> + TestingCommons.blockForBitcoin(head.height) + } + def head = new GrpcHead( + Chain.BITCOIN, + Stub(DefaultUpstream), + convert, null + ) + when: + def act = head.getFlux() + .take(3) + head.start(client) + + then: + StepVerifier.create(act) + .expectNext(TestingCommons.blockForBitcoin(10)) + .expectNext(TestingCommons.blockForBitcoin(11)) + .expectNext(TestingCommons.blockForBitcoin(12)) + .expectComplete() + .verify(Duration.ofSeconds(5)) + } + +} diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamStatusSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamStatusSpec.groovy new file mode 100644 index 000000000..2f1df05be --- /dev/null +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreamStatusSpec.groovy @@ -0,0 +1,128 @@ +/** + * Copyright (c) 2020 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.grpc + +import io.emeraldpay.api.proto.BlockchainOuterClass +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.startup.QuorumForLabels +import io.emeraldpay.dshackle.upstream.calls.DirectCallMethods +import spock.lang.Specification + +class GrpcUpstreamStatusSpec extends Specification { + + def "Updates with new labels"() { + setup: + def status = new GrpcUpstreamStatus() + when: + status.update( + BlockchainOuterClass.DescribeChain.newBuilder() + .addNodes( + BlockchainOuterClass.NodeDetails.newBuilder() + .setQuorum(1) + .addLabels( + BlockchainOuterClass.Label.newBuilder().setName("test").setValue("foo") + ) + ) + .build() + ) + def act = status.getLabels() + then: + act.toList() == [ + UpstreamsConfig.Labels.fromMap([test: "foo"]) + ] + + // replace with new value + when: + status.update( + BlockchainOuterClass.DescribeChain.newBuilder() + .addNodes( + BlockchainOuterClass.NodeDetails.newBuilder() + .setQuorum(1) + .addLabels( + BlockchainOuterClass.Label.newBuilder().setName("test").setValue("bar") + ) + ) + .build() + ) + act = status.getLabels() + then: + act.toList() == [ + UpstreamsConfig.Labels.fromMap([test: "bar"]) + ] + + // more values + when: + status.update( + BlockchainOuterClass.DescribeChain.newBuilder() + .addNodes( + BlockchainOuterClass.NodeDetails.newBuilder() + .setQuorum(1) + .addLabels( + BlockchainOuterClass.Label.newBuilder().setName("test1").setValue("bar") + ) + .addLabels( + BlockchainOuterClass.Label.newBuilder().setName("test2").setValue("baz") + ) + ) + .build() + ) + act = status.getLabels() + then: + act.toList() == [ + UpstreamsConfig.Labels.fromMap([test1: "bar", test2: "baz"]) + ] + } + + def "Updates with new nodes"() { + setup: + def status = new GrpcUpstreamStatus() + when: + status.update( + BlockchainOuterClass.DescribeChain.newBuilder() + .addNodes( + BlockchainOuterClass.NodeDetails.newBuilder() + .setQuorum(1) + .addLabels( + BlockchainOuterClass.Label.newBuilder().setName("test").setValue("foo") + ) + ) + .build() + ) + def act = status.getNodes() + then: + act == new QuorumForLabels().tap { + it.add(new QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap([test: "foo"]))) + } + } + + def "Updates with methods"() { + setup: + def status = new GrpcUpstreamStatus() + when: + status.update( + BlockchainOuterClass.DescribeChain.newBuilder() + .addAllSupportedMethods([ + "test_1", + "test_2" + ]) + .build() + ) + def act = status.getCallMethods() + then: + act.supportedMethods == ["test_1", "test_2"].toSet() + act instanceof DirectCallMethods + } +}