From 3aa54cc184460007f89a6f86850f9a75a511e947 Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Sat, 27 Mar 2021 15:16:58 -0400 Subject: [PATCH] problem: doesn't get head updates from all available heads solution: ensure all heads are started on multihead start rel: #67 --- .../dshackle/upstream/MergedHead.kt | 7 ++ .../upstream/ethereum/EthereumMultistream.kt | 3 +- .../upstream/ethereum/EthereumWsHead.kt | 1 + .../dshackle/upstream/MergedHeadSpec.groovy | 67 +++++++++++++++++++ .../testing/trial/proxy/DispatchSpec.groovy | 21 ++++++ 5 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 src/test/groovy/io/emeraldpay/dshackle/upstream/MergedHeadSpec.groovy diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt index 96c85444f..7f29197d6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt @@ -33,11 +33,18 @@ class MergedHead( } override fun start() { + sources.forEach { head -> + if (head is Lifecycle && !head.isRunning) { + head.start() + } + } + subscription?.dispose() subscription = super.follow(Flux.merge(sources.map { it.getFlux() })) } override fun stop() { subscription?.dispose() + subscription = null } override fun setCaches(caches: Caches) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt index 7c56be787..4efc6fecc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumMultistream.kt @@ -91,7 +91,8 @@ open class EthereumMultistream( upstream.setLag(0) upstream.getHead() } else { - val newHead = MergedHead(upstreams.map { it.getHead() }).apply { + val heads = upstreams.map { it.getHead() } + val newHead = MergedHead(heads).apply { this.start() } val lagObserver = EthereumHeadLagObserver(newHead, upstreams as Collection).apply { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt index 90a5410be..26ad786c7 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumWsHead.kt @@ -33,6 +33,7 @@ class EthereumWsHead( } override fun start() { + this.subscription?.dispose() this.subscription = super.follow(ws.getFlux()) } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/MergedHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/MergedHeadSpec.groovy new file mode 100644 index 000000000..217f2f740 --- /dev/null +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/MergedHeadSpec.groovy @@ -0,0 +1,67 @@ +/** + * Copyright (c) 2021 EmeraldPay, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.emeraldpay.dshackle.upstream + +import org.springframework.context.Lifecycle +import reactor.core.publisher.Flux +import spock.lang.Specification + +class MergedHeadSpec extends Specification { + + def "ensures that heads are running on start"() { + setup: + def head1 = Stub(TestHead1) { + _ * getFlux() >> Flux.empty() + } + def head2 = Mock(TestHead2) { + _ * isRunning() >> true + _ * getFlux() >> Flux.empty() + } + def head3 = Mock(TestHead2) { + _ * isRunning() >> false + _ * getFlux() >> Flux.empty() + } + + when: + def merged = new MergedHead([head1, head2, head3]) + merged.start() + + then: + 1 * head3.start() + } + + class TestHead1 extends AbstractHead { + + } + + class TestHead2 extends AbstractHead implements Lifecycle { + + @Override + void start() { + + } + + @Override + void stop() { + + } + + @Override + boolean isRunning() { + return false + } + } +} diff --git a/testing/trial/src/test/groovy/io/emeraldpay/dshackle/testing/trial/proxy/DispatchSpec.groovy b/testing/trial/src/test/groovy/io/emeraldpay/dshackle/testing/trial/proxy/DispatchSpec.groovy index abae04301..9f0dd5623 100644 --- a/testing/trial/src/test/groovy/io/emeraldpay/dshackle/testing/trial/proxy/DispatchSpec.groovy +++ b/testing/trial/src/test/groovy/io/emeraldpay/dshackle/testing/trial/proxy/DispatchSpec.groovy @@ -28,6 +28,27 @@ class DispatchSpec extends Specification { calls1.size() + calls2.size() == 100 } + def "multiple calls routed roughly equal to upstreams - for latest block"() { + when: + def calls1before = ProxyClient.forOriginal(18545).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List + def calls2before = ProxyClient.forOriginal(18546).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List + + 100.times { + client.execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000001", data: "0x00000000" + Integer.toString(it, 16)], "0x100001"]) + } + + def calls1after = ProxyClient.forOriginal(18545).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List + def calls2after = ProxyClient.forOriginal(18546).execute("eth_call", [[to: "0x0123456789abcdef0123456789abcdef00000002"]]).result as List + + def calls1 = onlyNew(calls1before, calls1after) + def calls2 = onlyNew(calls2before, calls2after) + + then: + calls1.size() >= 48 + calls2.size() >= 48 + calls1.size() + calls2.size() == 100 + } + private List onlyNew(List before, List after) { return after.findAll { a -> !before.any { b ->