Skip to content

Commit

Permalink
problem: upstream options are not always correctly merged when define…
Browse files Browse the repository at this point in the history
…d in few places
  • Loading branch information
splix committed Oct 4, 2022
1 parent dec4280 commit 10ec822
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 76 deletions.
81 changes: 55 additions & 26 deletions src/main/kotlin/io/emeraldpay/dshackle/config/UpstreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package io.emeraldpay.dshackle.config

import io.emeraldpay.dshackle.Defaults
import org.apache.commons.lang3.ObjectUtils
import java.net.URI
import java.time.Duration
import java.util.Arrays
import java.util.Locale

Expand All @@ -29,73 +31,100 @@ open class UpstreamsConfig {
private const val MIN_PRIORITY = 0
private const val MAX_PRIORITY = 1_000_000
private const val DEFAULT_PRIORITY = 10
private const val DEFAULT_VALIDATION_INTERVAL = 30
}

open class Options {
data class Options(
var disableValidation: Boolean,
var validationInterval: Duration,
var timeout: Duration,
var providesBalance: Boolean,
var priority: Int,
var validatePeers: Boolean,
var minPeers: Int,
var validateSyncing: Boolean
)

open class PartialOptions {

var disableValidation: Boolean? = null
var validationInterval: Int = 30
var validationInterval: Int? = null
set(value) {
require(value > 0) {
require(value == null || value > 0) {
"validation-interval must be a positive number: $value"
}
field = value
}
var timeout = Defaults.timeout

var timeout: Int? = null
var providesBalance: Boolean? = null
var priority: Int = DEFAULT_PRIORITY
var priority: Int? = null
set(value) {
require(value in MIN_PRIORITY..MAX_PRIORITY) {
require(value == null || value in MIN_PRIORITY..MAX_PRIORITY) {
"Upstream priority must be in $MIN_PRIORITY..$MAX_PRIORITY. Configured: $value"
}
field = value
}
var validatePeers: Boolean = true
var minPeers: Int? = 1
var validatePeers: Boolean? = null
var minPeers: Int? = null
set(value) {
require(value != null && value >= 0) {
require(value == null || value >= 0) {
"min-peers must be a positive number: $value"
}
field = value
}
var validateSyncing: Boolean = true
var validateSyncing: Boolean? = null

fun merge(overwrites: Options?): Options {
fun merge(overwrites: PartialOptions?): PartialOptions {
if (overwrites == null) {
return this
}
val copy = Options()
copy.priority = this.priority.coerceAtLeast(overwrites.priority)
copy.validatePeers = this.validatePeers && overwrites.validatePeers
copy.minPeers = if (this.minPeers != null) this.minPeers else overwrites.minPeers
copy.disableValidation =
if (this.disableValidation != null) this.disableValidation else overwrites.disableValidation
copy.validationInterval = overwrites.validationInterval
copy.providesBalance =
if (this.providesBalance != null) this.providesBalance else overwrites.providesBalance
copy.validateSyncing = this.validateSyncing && overwrites.validateSyncing
val copy = PartialOptions()
copy.disableValidation = ObjectUtils.firstNonNull(overwrites.disableValidation, this.disableValidation)
copy.validationInterval = ObjectUtils.firstNonNull(overwrites.validationInterval, this.validationInterval)
copy.timeout = ObjectUtils.firstNonNull(overwrites.timeout, this.timeout)
copy.providesBalance = ObjectUtils.firstNonNull(overwrites.providesBalance, this.providesBalance)
copy.priority = ObjectUtils.firstNonNull(overwrites.priority, this.priority)
copy.validatePeers = ObjectUtils.firstNonNull(overwrites.validatePeers, this.validatePeers)
copy.minPeers = ObjectUtils.firstNonNull(overwrites.minPeers, this.minPeers)
copy.validateSyncing = ObjectUtils.firstNonNull(overwrites.validateSyncing, this.validateSyncing)
return copy
}

fun build(): Options {
return Options(
disableValidation = ObjectUtils.firstNonNull(this.disableValidation, false)!!,
validationInterval = ObjectUtils.firstNonNull(this.validationInterval, DEFAULT_VALIDATION_INTERVAL)!!
.toLong().let(Duration::ofSeconds),
timeout = ObjectUtils.firstNonNull(this.timeout?.toLong()?.let(Duration::ofSeconds), Defaults.timeout)!!,
providesBalance = ObjectUtils.firstNonNull(this.providesBalance, false)!!,
priority = ObjectUtils.firstNonNull(this.priority, DEFAULT_PRIORITY)!!,
validatePeers = ObjectUtils.firstNonNull(this.validatePeers, true)!!,
minPeers = ObjectUtils.firstNonNull(this.minPeers, 1)!!,
validateSyncing = ObjectUtils.firstNonNull(this.validateSyncing, true)!!,
)
}

companion object {
@JvmStatic
fun getDefaults(): Options {
val options = Options()
fun getDefaults(): PartialOptions {
val options = PartialOptions()
options.minPeers = 1
options.disableValidation = false
return options
}
}
}

class DefaultOptions : Options() {
class DefaultOptions : PartialOptions() {
var chains: List<String>? = null
var options: Options? = null
var options: PartialOptions? = null
}

class Upstream<T : UpstreamConnection> {
var id: String? = null
var chain: String? = null
var options: Options? = null
var options: PartialOptions? = null
var isEnabled = true
var connection: T? = null
val labels = Labels()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.yaml.snakeyaml.nodes.ScalarNode
import reactor.util.function.Tuples
import java.io.InputStream
import java.net.URI
import java.time.Duration
import java.util.Locale

class UpstreamsConfigReader(
Expand Down Expand Up @@ -217,7 +216,7 @@ class UpstreamsConfigReader(
upstream.methods = tryReadMethods(upNode)
getValueAsInt(upNode, "priority")?.let {
if (upstream.options == null) {
upstream.options = UpstreamsConfig.Options.getDefaults()
upstream.options = UpstreamsConfig.PartialOptions.getDefaults()
}
upstream.options!!.priority = it
}
Expand Down Expand Up @@ -270,7 +269,7 @@ class UpstreamsConfigReader(
}
}

internal fun tryReadOptions(upNode: MappingNode): UpstreamsConfig.Options? {
internal fun tryReadOptions(upNode: MappingNode): UpstreamsConfig.PartialOptions? {
return if (hasAny(upNode, "options")) {
return getMapping(upNode, "options")?.let { values ->
readOptions(values)
Expand Down Expand Up @@ -305,8 +304,8 @@ class UpstreamsConfigReader(
}
}

internal fun readOptions(values: MappingNode): UpstreamsConfig.Options {
val options = UpstreamsConfig.Options()
internal fun readOptions(values: MappingNode): UpstreamsConfig.PartialOptions {
val options = UpstreamsConfig.PartialOptions()
getValueAsBool(values, "validate-peers")?.let {
options.validatePeers = it
}
Expand All @@ -317,7 +316,7 @@ class UpstreamsConfigReader(
options.minPeers = it
}
getValueAsInt(values, "timeout")?.let {
options.timeout = Duration.ofSeconds(it.toLong())
options.timeout = it
}
getValueAsBool(values, "disable-validation")?.let {
options.disableValidation = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,22 @@ open class ConfiguredUpstreams(
}
log.debug("Start upstream ${up.id}")
if (up.connection is UpstreamsConfig.GrpcConnection) {
val options = up.options ?: UpstreamsConfig.Options()
buildGrpcUpstream(up.cast(UpstreamsConfig.GrpcConnection::class.java), options)
val options = up.options ?: UpstreamsConfig.PartialOptions()
buildGrpcUpstream(up.cast(UpstreamsConfig.GrpcConnection::class.java), options.build())
} else {
val chain = Global.chainById(up.chain)
if (chain == Chain.UNSPECIFIED) {
log.error("Chain is unknown: ${up.chain}")
return@forEach
}
val options = (defaultOptions[chain] ?: UpstreamsConfig.Options.getDefaults())
.merge(up.options ?: UpstreamsConfig.Options())
val options = (defaultOptions[chain] ?: UpstreamsConfig.PartialOptions.getDefaults())
.merge(up.options ?: UpstreamsConfig.PartialOptions())
when (BlockchainType.from(chain)) {
BlockchainType.ETHEREUM -> {
buildEthereumUpstream(up.cast(UpstreamsConfig.EthereumConnection::class.java), chain, options)
buildEthereumUpstream(up.cast(UpstreamsConfig.EthereumConnection::class.java), chain, options.build())
}
BlockchainType.BITCOIN -> {
buildBitcoinUpstream(up.cast(UpstreamsConfig.BitcoinConnection::class.java), chain, options)
buildBitcoinUpstream(up.cast(UpstreamsConfig.BitcoinConnection::class.java), chain, options.build())
}
else -> {
log.error("Chain is unsupported: ${up.chain}")
Expand All @@ -102,8 +102,8 @@ open class ConfiguredUpstreams(
}
}

private fun buildDefaultOptions(config: UpstreamsConfig): HashMap<Chain, UpstreamsConfig.Options> {
val defaultOptions = HashMap<Chain, UpstreamsConfig.Options>()
private fun buildDefaultOptions(config: UpstreamsConfig): HashMap<Chain, UpstreamsConfig.PartialOptions> {
val defaultOptions = HashMap<Chain, UpstreamsConfig.PartialOptions>()
config.defaultOptions.forEach { defaultsConfig ->
defaultsConfig.chains?.forEach { chainName ->
Global.chainById(chainName).let { chain ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ abstract class Multistream(

// TODO options for multistream are useless
override fun getOptions(): UpstreamsConfig.Options {
return UpstreamsConfig.Options()
throw RuntimeException("No options for multistream")
}

// TODO roles for multistream are useless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ open class EthereumRpcUpstream(
constructor(id: String, chain: Chain, forkWatch: ForkWatch, api: Reader<JsonRpcRequest, JsonRpcResponse>) :
this(
id, chain, forkWatch, api, null,
UpstreamsConfig.Options.getDefaults(), UpstreamsConfig.UpstreamRole.PRIMARY,
UpstreamsConfig.PartialOptions.getDefaults().build(), UpstreamsConfig.UpstreamRole.PRIMARY,
QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels()),
DirectCallMethods()
)
Expand All @@ -62,7 +62,7 @@ open class EthereumRpcUpstream(
override fun start() {
log.info("Configured for ${chain.chainName}")
super.start()
if (getOptions().disableValidation != null && getOptions().disableValidation!!) {
if (getOptions().disableValidation) {
log.warn("Disable validation for upstream ${this.getId()}")
this.setLag(0)
this.setStatus(UpstreamAvailability.OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.util.function.Tuple2
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException

Expand Down Expand Up @@ -110,7 +109,7 @@ open class EthereumUpstreamValidator(
}

fun start(): Flux<UpstreamAvailability> {
return Flux.interval(Duration.ofSeconds(options.validationInterval.toLong()))
return Flux.interval(options.validationInterval)
.subscribeOn(scheduler)
.flatMap {
validate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BitcoinGrpcUpstream(
}

constructor(parentId: String, role: UpstreamsConfig.UpstreamRole, chain: Chain, remote: ReactorBlockchainStub, client: JsonRpcGrpcClient) :
this(parentId, ForkWatch.Never(), role, chain, UpstreamsConfig.Options.getDefaults(), remote, client)
this(parentId, ForkWatch.Never(), role, chain, UpstreamsConfig.PartialOptions.getDefaults().build(), remote, client)

private val extractBlock = ExtractBlock()
private val defaultReader: Reader<JsonRpcRequest, JsonRpcResponse> = client.forSelector(Selector.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ open class EthereumGrpcUpstream(
Lifecycle {

constructor(parentId: String, role: UpstreamsConfig.UpstreamRole, chain: Chain, remote: ReactorBlockchainStub, client: JsonRpcGrpcClient) :
this(parentId, ForkWatch.Never(), role, chain, UpstreamsConfig.Options.getDefaults(), remote, client)
this(parentId, ForkWatch.Never(), role, chain, UpstreamsConfig.PartialOptions.getDefaults().build(), remote, client)

private val blockConverter: Function<BlockchainOuterClass.ChainHead, BlockContainer> = Function { value ->
val block = BlockContainer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class GrpcUpstreams(
) {
private val log = LoggerFactory.getLogger(GrpcUpstreams::class.java)

var options = UpstreamsConfig.Options.getDefaults()
var options = UpstreamsConfig.PartialOptions.getDefaults().build()

private var client: ReactorBlockchainGrpc.ReactorBlockchainStub? = null
private val known = HashMap<Chain, DefaultUpstream>()
Expand Down
Loading

0 comments on commit 10ec822

Please sign in to comment.