Skip to content

Commit

Permalink
solution: more granular config for upstream validation
Browse files Browse the repository at this point in the history
  • Loading branch information
splix committed Oct 4, 2022
1 parent db9b331 commit dec4280
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 35 deletions.
52 changes: 52 additions & 0 deletions docs/reference-configuration.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,10 @@ Enable/disable the upstream.
Used to select an upstream per-request.
See link:09-quorum-and-selectors.adoc[Quorum and Selectors]

| `options`
|
| Other configuration options. See <<general-options>>

| `methods`
| no
| Enable (`enabled`) or disable (`disabled`) additional JSON RPC methods that are provided by that particular upstream
Expand Down Expand Up @@ -729,6 +733,54 @@ Configuration has no effect in Proof-of-Work blockchains.

|===

[#general-options]
=== General Upstream Options

[cols="2a,1,1a,5"]
|===
| Option | Type | Default | Description

| `disable-validation`
| boolean
| `false`
| Disables all the validations of the upstream. I.e., it turns off `validate-peers` and `validate-syncing` checks if set to `true`.

| `validation-interval`
| number
| `30`
| Period in seconds to re-validate the upstream.

| `validate-peers`
| boolean
| `true`
| Disables validation of the peers connected to the upstream (as `net_peerCount` method).
Dshackle assumes that if there are too few peers then the Upstream is just started and may produce invalid/outdated responses

| `min-peers`
| number
| `1`
| The minimum number of connected peer to consider the upstream valid if `validate-peers` is enabled.
If it's set to `0` it essentially disables the peer validation.

| `validate-syncing`
| boolean
| `true`
| Disables checking for the state of syncing on the upstream (as `eth_syncing` method).
If the Upstream is in _syncing_ state then the Dshackle doesn't use it for call until it reaches the blockchain head.

| `timeout`
| number
| `60`
| Timeout in seconds to wait for an answer from the upstream before considering it as failed.

| `balance`
| boolean
|
| Suitable for Bitcoin upstream.
Tells if the Upstream can be used to call balance methods, which requires that the node has the indexing as turned on.

|===

==== Ethereum Connection Options

.Connection Config for Ethereum Upstream
Expand Down
29 changes: 20 additions & 9 deletions src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ open class UpstreamsConfig {

open class Options {
var disableValidation: Boolean? = null
var validationInterval: Int = 30
set(value) {
require(value > 0) {
"validation-interval must be a positive number: $value"
}
field = value
}
var timeout = Defaults.timeout
var providesBalance: Boolean? = null
var priority: Int = DEFAULT_PRIORITY
Expand All @@ -42,26 +49,30 @@ open class UpstreamsConfig {
}
field = value
}

var validatePeers: Boolean = true
var minPeers: Int? = 1
set(value) {
require(value != null && value > 0) {
"minPeers must be a positive number: $value"
require(value != null && value >= 0) {
"min-peers must be a positive number: $value"
}
field = value
}
var validateSyncing: Boolean = true

fun merge(additional: Options?): Options {
if (additional == null) {
fun merge(overwrites: Options?): Options {
if (overwrites == null) {
return this
}
val copy = Options()
copy.priority = this.priority.coerceAtLeast(additional.priority)
copy.minPeers = if (this.minPeers != null) this.minPeers else additional.minPeers
copy.priority = this.priority.coerceAtLeast(overwrites.priority)
copy.validatePeers = this.validatePeers && overwrites.validatePeers
copy.minPeers = if (this.minPeers != null) this.minPeers else overwrites.minPeers
copy.disableValidation =
if (this.disableValidation != null) this.disableValidation else additional.disableValidation
if (this.disableValidation != null) this.disableValidation else overwrites.disableValidation
copy.validationInterval = overwrites.validationInterval
copy.providesBalance =
if (this.providesBalance != null) this.providesBalance else additional.providesBalance
if (this.providesBalance != null) this.providesBalance else overwrites.providesBalance
copy.validateSyncing = this.validateSyncing && overwrites.validateSyncing
return copy
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ class UpstreamsConfigReader(

internal fun readOptions(values: MappingNode): UpstreamsConfig.Options {
val options = UpstreamsConfig.Options()
getValueAsBool(values, "validate-peers")?.let {
options.validatePeers = it
}
getValueAsBool(values, "validate-syncing")?.let {
options.validateSyncing = it
}
getValueAsInt(values, "min-peers")?.let {
options.minPeers = it
}
Expand All @@ -316,6 +322,9 @@ class UpstreamsConfigReader(
getValueAsBool(values, "disable-validation")?.let {
options.disableValidation = it
}
getValueAsInt(values, "validation-interval")?.let {
options.validationInterval = it
}
getValueAsBool(values, "balance")?.let {
options.providesBalance = it
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ open class ConfiguredUpstreams(
log.error("Chain is unknown: ${up.chain}")
return@forEach
}
val options = (up.options ?: UpstreamsConfig.Options())
.merge(defaultOptions[chain] ?: UpstreamsConfig.Options.getDefaults())
val options = (defaultOptions[chain] ?: UpstreamsConfig.Options.getDefaults())
.merge(up.options ?: UpstreamsConfig.Options())
when (BlockchainType.from(chain)) {
BlockchainType.ETHEREUM -> {
buildEthereumUpstream(up.cast(UpstreamsConfig.EthereumConnection::class.java), chain, options)
Expand Down Expand Up @@ -117,9 +117,6 @@ open class ConfiguredUpstreams(
}
}
}
defaultOptions.keys.forEach { chain ->
defaultOptions[chain] = defaultOptions[chain]!!.merge(UpstreamsConfig.Options.getDefaults())
}
return defaultOptions
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.util.function.Tuple2
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
Expand All @@ -46,6 +47,23 @@ open class EthereumUpstreamValidator(
private val objectMapper: ObjectMapper = Global.objectMapper

open fun validate(): Mono<UpstreamAvailability> {
return Mono.zip(
validateSyncing(),
validatePeers()
)
.map(::resolve)
.defaultIfEmpty(UpstreamAvailability.UNAVAILABLE)
.onErrorReturn(UpstreamAvailability.UNAVAILABLE)
}

fun resolve(results: Tuple2<UpstreamAvailability, UpstreamAvailability>): UpstreamAvailability {
return if (results.t1.isBetterTo(results.t2)) results.t2 else results.t1
}

fun validateSyncing(): Mono<UpstreamAvailability> {
if (!options.validateSyncing) {
return Mono.just(UpstreamAvailability.OK)
}
return upstream
.getApi()
.read(JsonRpcRequest("eth_syncing", listOf()))
Expand All @@ -56,35 +74,43 @@ open class EthereumUpstreamValidator(
Mono.fromCallable { log.warn("No response for eth_syncing from ${upstream.getId()}") }
.then(Mono.error(TimeoutException("Validation timeout for Syncing")))
)
.flatMap { value ->
.map { value ->
if (value.isSyncing) {
Mono.just(UpstreamAvailability.SYNCING)
UpstreamAvailability.SYNCING
} else {
UpstreamAvailability.OK
}
}
.onErrorReturn(UpstreamAvailability.UNAVAILABLE)
}

fun validatePeers(): Mono<UpstreamAvailability> {
if (!options.validatePeers || options.minPeers == 0) {
return Mono.just(UpstreamAvailability.OK)
}
return upstream
.getApi()
.read(JsonRpcRequest("net_peerCount", listOf()))
.flatMap(JsonRpcResponse::requireStringResult)
.map(Integer::decode)
.timeout(
Defaults.timeoutInternal,
Mono.fromCallable { log.warn("No response for net_peerCount from ${upstream.getId()}") }
.then(Mono.error(TimeoutException("Validation timeout for Peers")))
)
.map { count ->
val minPeers = options.minPeers ?: 1
if (count < minPeers) {
UpstreamAvailability.IMMATURE
} else {
upstream
.getApi()
.read(JsonRpcRequest("net_peerCount", listOf()))
.flatMap(JsonRpcResponse::requireStringResult)
.map(Integer::decode)
.timeout(
Defaults.timeoutInternal,
Mono.fromCallable { log.warn("No response for net_peerCount from ${upstream.getId()}") }
.then(Mono.error(TimeoutException("Validation timeout for Peers")))
)
.map { count ->
val minPeers = options.minPeers ?: 1
if (count < minPeers) {
UpstreamAvailability.IMMATURE
} else {
UpstreamAvailability.OK
}
}
UpstreamAvailability.OK
}
}
.onErrorReturn(UpstreamAvailability.UNAVAILABLE)
}

fun start(): Flux<UpstreamAvailability> {
return Flux.interval(Duration.ofSeconds(15))
return Flux.interval(Duration.ofSeconds(options.validationInterval.toLong()))
.subscribeOn(scheduler)
.flatMap {
validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,29 @@ class UpstreamsConfigReaderSpec extends Specification {
act.upstreams.get(0).role == UpstreamsConfig.UpstreamRole.PRIMARY
act.upstreams.get(1).role == UpstreamsConfig.UpstreamRole.PRIMARY
}

def "Parse config with validation options"() {
setup:
def config = this.class.getClassLoader().getResourceAsStream("configs/upstreams-validation.yaml")
when:
def act = reader.read(config)
then:
act != null
act.upstreams.size() == 3
with(act.upstreams.get(0).options) {
disableValidation == false
validateSyncing == true
validatePeers == false
}
with(act.upstreams.get(1).options) {
disableValidation == false
validateSyncing == false
validatePeers == false
}
with(act.upstreams.get(2).options) {
disableValidation == true
validateSyncing == true
validatePeers == true
}
}
}
Loading

0 comments on commit dec4280

Please sign in to comment.