diff --git a/docs/reference-configuration.adoc b/docs/reference-configuration.adoc index 3bc659ab2..eabb67ab9 100644 --- a/docs/reference-configuration.adoc +++ b/docs/reference-configuration.adoc @@ -698,6 +698,10 @@ Enable/disable the upstream. Used to select an upstream per-request. See link:09-quorum-and-selectors.adoc[Quorum and Selectors] +| `options` +| +| Other configuration options. See <> + | `methods` | no | Enable (`enabled`) or disable (`disabled`) additional JSON RPC methods that are provided by that particular upstream @@ -729,6 +733,54 @@ Configuration has no effect in Proof-of-Work blockchains. |=== +[#general-options] +=== General Upstream Options + +[cols="2a,1,1a,5"] +|=== +| Option | Type | Default | Description + +| `disable-validation` +| boolean +| `false` +| Disables all the validations of the upstream. I.e., it turns off `validate-peers` and `validate-syncing` checks if set to `true`. + +| `validation-interval` +| number +| `30` +| Period in seconds to re-validate the upstream. + +| `validate-peers` +| boolean +| `true` +| Disables validation of the peers connected to the upstream (as `net_peerCount` method). +Dshackle assumes that if there are too few peers then the Upstream is just started and may produce invalid/outdated responses + +| `min-peers` +| number +| `1` +| The minimum number of connected peer to consider the upstream valid if `validate-peers` is enabled. +If it's set to `0` it essentially disables the peer validation. + +| `validate-syncing` +| boolean +| `true` +| Disables checking for the state of syncing on the upstream (as `eth_syncing` method). +If the Upstream is in _syncing_ state then the Dshackle doesn't use it for call until it reaches the blockchain head. + +| `timeout` +| number +| `60` +| Timeout in seconds to wait for an answer from the upstream before considering it as failed. + +| `balance` +| boolean +| +| Suitable for Bitcoin upstream. +Tells if the Upstream can be used to call balance methods, which requires that the node has the indexing as turned on. + +|=== + ==== Ethereum Connection Options .Connection Config for Ethereum Upstream diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt index 218e5ac6b..dfa6ca836 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt @@ -33,6 +33,13 @@ open class UpstreamsConfig { open class Options { var disableValidation: Boolean? = null + var validationInterval: Int = 30 + set(value) { + require(value > 0) { + "validation-interval must be a positive number: $value" + } + field = value + } var timeout = Defaults.timeout var providesBalance: Boolean? = null var priority: Int = DEFAULT_PRIORITY @@ -42,26 +49,30 @@ open class UpstreamsConfig { } field = value } - + var validatePeers: Boolean = true var minPeers: Int? = 1 set(value) { - require(value != null && value > 0) { - "minPeers must be a positive number: $value" + require(value != null && value >= 0) { + "min-peers must be a positive number: $value" } field = value } + var validateSyncing: Boolean = true - fun merge(additional: Options?): Options { - if (additional == null) { + fun merge(overwrites: Options?): Options { + if (overwrites == null) { return this } val copy = Options() - copy.priority = this.priority.coerceAtLeast(additional.priority) - copy.minPeers = if (this.minPeers != null) this.minPeers else additional.minPeers + copy.priority = this.priority.coerceAtLeast(overwrites.priority) + copy.validatePeers = this.validatePeers && overwrites.validatePeers + copy.minPeers = if (this.minPeers != null) this.minPeers else overwrites.minPeers copy.disableValidation = - if (this.disableValidation != null) this.disableValidation else additional.disableValidation + if (this.disableValidation != null) this.disableValidation else overwrites.disableValidation + copy.validationInterval = overwrites.validationInterval copy.providesBalance = - if (this.providesBalance != null) this.providesBalance else additional.providesBalance + if (this.providesBalance != null) this.providesBalance else overwrites.providesBalance + copy.validateSyncing = this.validateSyncing && overwrites.validateSyncing return copy } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt index 3adc6d8ae..ab2b189bc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfigReader.kt @@ -307,6 +307,12 @@ class UpstreamsConfigReader( internal fun readOptions(values: MappingNode): UpstreamsConfig.Options { val options = UpstreamsConfig.Options() + getValueAsBool(values, "validate-peers")?.let { + options.validatePeers = it + } + getValueAsBool(values, "validate-syncing")?.let { + options.validateSyncing = it + } getValueAsInt(values, "min-peers")?.let { options.minPeers = it } @@ -316,6 +322,9 @@ class UpstreamsConfigReader( getValueAsBool(values, "disable-validation")?.let { options.disableValidation = it } + getValueAsInt(values, "validation-interval")?.let { + options.validationInterval = it + } getValueAsBool(values, "balance")?.let { options.providesBalance = it } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index b3a4831e3..6f67a5a39 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -84,8 +84,8 @@ open class ConfiguredUpstreams( log.error("Chain is unknown: ${up.chain}") return@forEach } - val options = (up.options ?: UpstreamsConfig.Options()) - .merge(defaultOptions[chain] ?: UpstreamsConfig.Options.getDefaults()) + val options = (defaultOptions[chain] ?: UpstreamsConfig.Options.getDefaults()) + .merge(up.options ?: UpstreamsConfig.Options()) when (BlockchainType.from(chain)) { BlockchainType.ETHEREUM -> { buildEthereumUpstream(up.cast(UpstreamsConfig.EthereumConnection::class.java), chain, options) @@ -117,9 +117,6 @@ open class ConfiguredUpstreams( } } } - defaultOptions.keys.forEach { chain -> - defaultOptions[chain] = defaultOptions[chain]!!.merge(UpstreamsConfig.Options.getDefaults()) - } return defaultOptions } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt index cc7feca80..79955ef8a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidator.kt @@ -29,6 +29,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers +import reactor.util.function.Tuple2 import java.time.Duration import java.util.concurrent.Executors import java.util.concurrent.TimeoutException @@ -46,6 +47,23 @@ open class EthereumUpstreamValidator( private val objectMapper: ObjectMapper = Global.objectMapper open fun validate(): Mono { + return Mono.zip( + validateSyncing(), + validatePeers() + ) + .map(::resolve) + .defaultIfEmpty(UpstreamAvailability.UNAVAILABLE) + .onErrorReturn(UpstreamAvailability.UNAVAILABLE) + } + + fun resolve(results: Tuple2): UpstreamAvailability { + return if (results.t1.isBetterTo(results.t2)) results.t2 else results.t1 + } + + fun validateSyncing(): Mono { + if (!options.validateSyncing) { + return Mono.just(UpstreamAvailability.OK) + } return upstream .getApi() .read(JsonRpcRequest("eth_syncing", listOf())) @@ -56,35 +74,43 @@ open class EthereumUpstreamValidator( Mono.fromCallable { log.warn("No response for eth_syncing from ${upstream.getId()}") } .then(Mono.error(TimeoutException("Validation timeout for Syncing"))) ) - .flatMap { value -> + .map { value -> if (value.isSyncing) { - Mono.just(UpstreamAvailability.SYNCING) + UpstreamAvailability.SYNCING + } else { + UpstreamAvailability.OK + } + } + .onErrorReturn(UpstreamAvailability.UNAVAILABLE) + } + + fun validatePeers(): Mono { + if (!options.validatePeers || options.minPeers == 0) { + return Mono.just(UpstreamAvailability.OK) + } + return upstream + .getApi() + .read(JsonRpcRequest("net_peerCount", listOf())) + .flatMap(JsonRpcResponse::requireStringResult) + .map(Integer::decode) + .timeout( + Defaults.timeoutInternal, + Mono.fromCallable { log.warn("No response for net_peerCount from ${upstream.getId()}") } + .then(Mono.error(TimeoutException("Validation timeout for Peers"))) + ) + .map { count -> + val minPeers = options.minPeers ?: 1 + if (count < minPeers) { + UpstreamAvailability.IMMATURE } else { - upstream - .getApi() - .read(JsonRpcRequest("net_peerCount", listOf())) - .flatMap(JsonRpcResponse::requireStringResult) - .map(Integer::decode) - .timeout( - Defaults.timeoutInternal, - Mono.fromCallable { log.warn("No response for net_peerCount from ${upstream.getId()}") } - .then(Mono.error(TimeoutException("Validation timeout for Peers"))) - ) - .map { count -> - val minPeers = options.minPeers ?: 1 - if (count < minPeers) { - UpstreamAvailability.IMMATURE - } else { - UpstreamAvailability.OK - } - } + UpstreamAvailability.OK } } .onErrorReturn(UpstreamAvailability.UNAVAILABLE) } fun start(): Flux { - return Flux.interval(Duration.ofSeconds(15)) + return Flux.interval(Duration.ofSeconds(options.validationInterval.toLong())) .subscribeOn(scheduler) .flatMap { validate() diff --git a/src/test/groovy/io/emeraldpay/dshackle/config/UpstreamsConfigReaderSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/config/UpstreamsConfigReaderSpec.groovy index 7bc1d8230..a41a039da 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/config/UpstreamsConfigReaderSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/config/UpstreamsConfigReaderSpec.groovy @@ -396,4 +396,29 @@ class UpstreamsConfigReaderSpec extends Specification { act.upstreams.get(0).role == UpstreamsConfig.UpstreamRole.PRIMARY act.upstreams.get(1).role == UpstreamsConfig.UpstreamRole.PRIMARY } + + def "Parse config with validation options"() { + setup: + def config = this.class.getClassLoader().getResourceAsStream("configs/upstreams-validation.yaml") + when: + def act = reader.read(config) + then: + act != null + act.upstreams.size() == 3 + with(act.upstreams.get(0).options) { + disableValidation == false + validateSyncing == true + validatePeers == false + } + with(act.upstreams.get(1).options) { + disableValidation == false + validateSyncing == false + validatePeers == false + } + with(act.upstreams.get(2).options) { + disableValidation == true + validateSyncing == true + validatePeers == true + } + } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy new file mode 100644 index 000000000..89325f9c2 --- /dev/null +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/EthereumUpstreamValidatorSpec.groovy @@ -0,0 +1,225 @@ +/** + * Copyright (c) 2022 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream.ethereum + +import io.emeraldpay.dshackle.config.UpstreamsConfig +import io.emeraldpay.dshackle.test.ApiReaderMock +import io.emeraldpay.dshackle.test.TestingCommons +import io.emeraldpay.etherjar.rpc.RpcResponseError +import reactor.util.function.Tuples +import spock.lang.Specification + +import java.time.Duration + +import static io.emeraldpay.dshackle.upstream.UpstreamAvailability.* + +class EthereumUpstreamValidatorSpec extends Specification { + + def "Resolve to final availability"() { + setup: + def validator = new EthereumUpstreamValidator(Stub(EthereumUpstream), UpstreamsConfig.Options.getDefaults()) + expect: + validator.resolve(Tuples.of(sync, peers)) == exp + where: + exp | sync | peers + OK | OK | OK + IMMATURE | OK | IMMATURE + UNAVAILABLE | OK | UNAVAILABLE + SYNCING | SYNCING | OK + SYNCING | SYNCING | IMMATURE + UNAVAILABLE | SYNCING | UNAVAILABLE + UNAVAILABLE | UNAVAILABLE | OK + UNAVAILABLE | UNAVAILABLE | IMMATURE + UNAVAILABLE | UNAVAILABLE | UNAVAILABLE + } + + def "Doesnt check eth_syncing when disabled"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validateSyncing = false + } + def up = Mock(EthereumUpstream) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validateSyncing().block(Duration.ofSeconds(1)) + then: + act == OK + 0 * up.getApi() + } + + def "Syncing is OK when false returned from upstream"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validateSyncing = true + } + def up = TestingCommons.upstream( + new ApiReaderMock().tap { + answer("eth_syncing", [], false) + } + ) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validateSyncing().block(Duration.ofSeconds(1)) + then: + act == OK + } + + def "Syncing is SYNCING when state returned from upstream"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validateSyncing = true + } + def up = TestingCommons.upstream( + new ApiReaderMock().tap { + answer("eth_syncing", [], [startingBlock: 100, currentBlock: 50]) + } + ) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validateSyncing().block(Duration.ofSeconds(1)) + then: + act == SYNCING + } + + def "Syncing is UNAVAILABLE when error returned from upstream"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validateSyncing = true + } + def up = TestingCommons.upstream( + new ApiReaderMock().tap { + answer("eth_syncing", [], new RpcResponseError(RpcResponseError.CODE_METHOD_NOT_EXIST, "Unavailable")) + } + ) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validateSyncing().block(Duration.ofSeconds(1)) + then: + act == UNAVAILABLE + } + + def "Doesnt validate peers when disabled"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validatePeers = false + it.minPeers = 10 + } + def up = Mock(EthereumUpstream) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validatePeers().block(Duration.ofSeconds(1)) + then: + act == OK + 0 * up.getApi() + } + + def "Doesnt validate peers when zero peers is expected"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validatePeers = true + it.minPeers = 0 + } + def up = Mock(EthereumUpstream) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validatePeers().block(Duration.ofSeconds(1)) + then: + act == OK + 0 * up.getApi() + } + + def "Peers is IMMATURE when state returned too few peers"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validatePeers = true + it.minPeers = 10 + } + def up = TestingCommons.upstream( + new ApiReaderMock().tap { + answer("net_peerCount", [], "0x5") + } + ) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validatePeers().block(Duration.ofSeconds(1)) + then: + act == IMMATURE + } + + def "Peers is OK when state returned exactly min peers"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validatePeers = true + it.minPeers = 10 + } + def up = TestingCommons.upstream( + new ApiReaderMock().tap { + answer("net_peerCount", [], "0xa") + } + ) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validatePeers().block(Duration.ofSeconds(1)) + then: + act == OK + } + + def "Peers is OK when state returned more than enough peers"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validatePeers = true + it.minPeers = 10 + } + def up = TestingCommons.upstream( + new ApiReaderMock().tap { + answer("net_peerCount", [], "0xff") + } + ) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validatePeers().block(Duration.ofSeconds(1)) + then: + act == OK + } + + def "Peers is UNAVAILABLE when state returned error"() { + setup: + def options = UpstreamsConfig.Options.getDefaults().tap { + it.validatePeers = true + it.minPeers = 10 + } + def up = TestingCommons.upstream( + new ApiReaderMock().tap { + answer("net_peerCount", [], new RpcResponseError(RpcResponseError.CODE_METHOD_NOT_EXIST, "Unavailable")) + } + ) + def validator = new EthereumUpstreamValidator(up, options) + + when: + def act = validator.validatePeers().block(Duration.ofSeconds(1)) + then: + act == UNAVAILABLE + } +} diff --git a/src/test/resources/configs/upstreams-validation.yaml b/src/test/resources/configs/upstreams-validation.yaml new file mode 100644 index 000000000..5a90d19aa --- /dev/null +++ b/src/test/resources/configs/upstreams-validation.yaml @@ -0,0 +1,36 @@ +version: v1 + +upstreams: + + - id: test-1 + chain: ethereum + options: + disable-validation: false + validate-syncing: true + validate-peers: false + connection: + ethereum: + rpc: + url: "http://localhost:8545" + + - id: test-2 + chain: ethereum + options: + disable-validation: false + validate-syncing: false + validate-peers: false + connection: + ethereum: + rpc: + url: "http://localhost:8546" + + - id: test-3 + chain: ethereum + options: + disable-validation: true + validate-syncing: true + validate-peers: true + connection: + ethereum: + rpc: + url: "http://localhost:8547"