From 8de6a8ceb583433b5f5bafaabfc3e770b191c819 Mon Sep 17 00:00:00 2001 From: KirillPamPam Date: Wed, 15 Nov 2023 17:44:21 +0400 Subject: [PATCH] Fix reload config (#341) --- .../config/reload/ReloadConfigSetup.kt | 13 ++-- .../reload/ReloadConfigUpstreamService.kt | 21 ++++--- .../upstream/generic/GenericMultistream.kt | 4 -- .../config/reload/ReloadConfigTest.kt | 63 ++++++++++++++----- 4 files changed, 68 insertions(+), 33 deletions(-) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt index 47ad7f164..49b6d7ab3 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigSetup.kt @@ -74,10 +74,9 @@ class ReloadConfigSetup( ) val upstreamsToRemove = upstreamsAnalyzeData.removed - .plus(upstreamsAnalyzeData.reloaded) .filterNot { chainsToReload.contains(it.second) } + .toSet() val upstreamsToAdd = upstreamsAnalyzeData.added - .plus(upstreamsAnalyzeData.reloaded.map { it.first }) reloadConfigUpstreamService.reloadUpstreams(chainsToReload, upstreamsToRemove, upstreamsToAdd, newUpstreamsConfig) @@ -107,9 +106,12 @@ class ReloadConfigSetup( } } - val added = newUpstreamsMap.minus(currentUpstreamsMap.keys).mapTo(mutableSetOf()) { it.key.first } + val added = newUpstreamsMap + .minus(currentUpstreamsMap.keys) + .mapTo(mutableSetOf()) { it.key } + .plus(reloaded) - return UpstreamAnalyzeData(added, removed, reloaded) + return UpstreamAnalyzeData(added, removed.plus(reloaded)) } private fun analyzeDefaultOptions( @@ -158,8 +160,7 @@ class ReloadConfigSetup( } private data class UpstreamAnalyzeData( - val added: Set = emptySet(), + val added: Set> = emptySet(), val removed: Set> = emptySet(), - val reloaded: Set> = emptySet(), ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt index d64518887..c9b95c412 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigUpstreamService.kt @@ -16,27 +16,30 @@ open class ReloadConfigUpstreamService( fun reloadUpstreams( chainsToReload: Set, - upstreamsToRemove: List>, - upstreamsToAdd: Set, + upstreamsToRemove: Set>, + upstreamsToAdd: Set>, newUpstreamsConfig: UpstreamsConfig, ) { val usedChains = removeUpstreams(chainsToReload, upstreamsToRemove) - addUpstreams(newUpstreamsConfig, chainsToReload, upstreamsToAdd) + addUpstreams(newUpstreamsConfig, chainsToReload, upstreamsToAdd.map { it.first }.toSet()) + + usedChains.forEach { chain -> + val upstreamsToRemovePerChain = upstreamsToRemove.filter { it.second == chain } + val upstreamsToAddPerChain = upstreamsToAdd.filter { it.second == chain } - usedChains.forEach { - multistreamHolder.getUpstream(it) - .run { - if (!this.haveUpstreams() && this.isRunning()) { + if (upstreamsToAddPerChain.isEmpty() && upstreamsToRemovePerChain.isNotEmpty()) { + multistreamHolder.getUpstream(chain) + .run { this.stop() } - } + } } } private fun removeUpstreams( chainsToReload: Set, - upstreamsToRemove: List>, + upstreamsToRemove: Set>, ): Set { val usedChains = mutableSetOf() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt index 4d359a12f..5e9b03e83 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericMultistream.kt @@ -106,10 +106,6 @@ open class GenericMultistream( head.removeHead(upstreamId) } - override fun isRunning(): Boolean { - return super.isRunning() || cachingReader.isRunning() - } - override fun makeLagObserver(): HeadLagObserver = HeadLagObserver(head, upstreams, DistanceExtractor::extractPriorityDistance, headScheduler, 6).apply { start() diff --git a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt index dcc975526..4750c1ab1 100644 --- a/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt +++ b/src/test/kotlin/io/emeraldpay/dshackle/config/reload/ReloadConfigTest.kt @@ -1,9 +1,11 @@ package io.emeraldpay.dshackle.config.reload +import io.emeraldpay.dshackle.Chain import io.emeraldpay.dshackle.Chain.ETHEREUM__MAINNET import io.emeraldpay.dshackle.Chain.POLYGON__MAINNET import io.emeraldpay.dshackle.Config import io.emeraldpay.dshackle.FileResolver +import io.emeraldpay.dshackle.cache.Caches import io.emeraldpay.dshackle.config.MainConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.config.UpstreamsConfigReader @@ -11,9 +13,14 @@ import io.emeraldpay.dshackle.foundation.ChainOptionsReader import io.emeraldpay.dshackle.startup.ConfiguredUpstreams import io.emeraldpay.dshackle.startup.UpstreamChangeEvent import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder +import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.Multistream -import io.emeraldpay.dshackle.upstream.Upstream +import io.emeraldpay.dshackle.upstream.generic.ChainSpecificRegistry +import io.emeraldpay.dshackle.upstream.generic.GenericMultistream +import io.emeraldpay.dshackle.upstream.generic.GenericUpstream import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.kotlin.any @@ -22,9 +29,13 @@ import org.mockito.kotlin.mock import org.mockito.kotlin.never import org.mockito.kotlin.verify import org.mockito.kotlin.whenever +import org.springframework.cloud.sleuth.Tracer import org.springframework.util.ResourceUtils +import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers import sun.misc.Signal import java.io.File +import java.util.concurrent.Executors class ReloadConfigTest { private val fileResolver = FileResolver(File("")) @@ -82,25 +93,28 @@ class ReloadConfigTest { mutableListOf(newConfig.upstreams[0], newConfig.upstreams[2]), ), ) + verify(msEth, never()).stop() + verify(msPoly, never()).stop() assertEquals(3, mainConfig.upstreams!!.upstreams.size) assertEquals(newConfig, mainConfig.upstreams) } @Test - fun `stop multistream if there are no upstreams left`() { + fun `stop multistream if all upstreams are removed from it`() { val up1 = upstream("local1") val up2 = upstream("local2") val up3 = upstream("local3") - val msEth = mock { - on { getAll() } doReturn listOf(up1, up2) - on { haveUpstreams() } doReturn false - on { isRunning() } doReturn true - } - val msPoly = mock { - on { getAll() } doReturn listOf(up3) - } + val msEth = multistream(ETHEREUM__MAINNET) + .also { + it.processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up1, UpstreamChangeEvent.ChangeType.ADDED)) + it.processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.ADDED)) + } + val msPoly = multistream(POLYGON__MAINNET) + .also { + it.processUpstreamsEvents(UpstreamChangeEvent(POLYGON__MAINNET, up3, UpstreamChangeEvent.ChangeType.ADDED)) + } val newConfigFile = ResourceUtils.getFile("classpath:configs/upstreams-changed-upstreams-removed.yaml") whenever(config.getConfigPath()).thenReturn(newConfigFile) @@ -121,15 +135,15 @@ class ReloadConfigTest { reloadConfig.handle(Signal("HUP")) - verify(msEth).processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up1, UpstreamChangeEvent.ChangeType.REMOVED)) - verify(msEth).processUpstreamsEvents(UpstreamChangeEvent(ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.REMOVED)) verify(configuredUpstreams).processUpstreams( UpstreamsConfig( newConfig.defaultOptions, mutableListOf(), ), ) - verify(msEth).stop() + + assertFalse(msEth.isRunning()) + assertTrue(msPoly.isRunning()) assertEquals(1, mainConfig.upstreams!!.upstreams.size) assertEquals(newConfig, mainConfig.upstreams) } @@ -153,8 +167,29 @@ class ReloadConfigTest { assertEquals(initialConfig, mainConfig.upstreams) } - private fun upstream(id: String): Upstream = + private fun multistream(chain: Chain): Multistream { + val cs = ChainSpecificRegistry.resolve(chain) + return GenericMultistream( + chain, + Schedulers.fromExecutor(Executors.newFixedThreadPool(6)), + null, + ArrayList(), + Caches.default(), + Schedulers.boundedElastic(), + cs.makeCachingReaderBuilder(mock()), + cs::localReaderBuilder, + cs.subscriptionBuilder(Schedulers.boundedElastic()), + ) + } + + private fun upstream(id: String): GenericUpstream = mock { + val head = mock { + on { getFlux() } doReturn Flux.empty() + } on { getId() } doReturn id + on { getHead() } doReturn head + on { observeState() } doReturn Flux.empty() + on { observeStatus() } doReturn Flux.empty() } }