diff --git a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt index ddda94f6c..4598b0934 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/config/context/SchedulersConfig.kt @@ -35,6 +35,16 @@ open class SchedulersConfig { return makeScheduler("ws-connection-resubscribe-scheduler", 2, monitoringConfig) } + @Bean + open fun wsScheduler(monitoringConfig: MonitoringConfig): Scheduler { + return makeScheduler("ws-scheduler", 4, monitoringConfig) + } + + @Bean + open fun headLivenessScheduler(monitoringConfig: MonitoringConfig): Scheduler { + return makeScheduler("head-liveness-scheduler", 4, monitoringConfig) + } + @Bean open fun grpcChannelExecutor(monitoringConfig: MonitoringConfig): Executor { return makePool("grpc-client-channel", 10, monitoringConfig) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index c59e35427..23036db6b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -71,7 +71,6 @@ import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Component import reactor.core.scheduler.Scheduler import reactor.core.scheduler.Schedulers -import java.lang.IllegalStateException import java.net.URI import java.util.concurrent.Executor import java.util.concurrent.Executors @@ -95,6 +94,8 @@ open class ConfiguredUpstreams( private val clientSpansInterceptor: ClientInterceptor?, @Qualifier("headScheduler") private val headScheduler: Scheduler, + private val wsScheduler: Scheduler, + private val headLivenessScheduler: Scheduler, private val authorizationConfig: AuthorizationConfig, private val grpcAuthContext: GrpcAuthContext, ) : ApplicationRunner { @@ -258,7 +259,7 @@ open class ConfiguredUpstreams( options, config.role, methods, - QuorumForLabels.QuorumItem(1, config.labels), + QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)), chainConfig, connectorFactory, eventPublisher, @@ -388,7 +389,7 @@ open class ConfiguredUpstreams( chain, endpoint.url, endpoint.origin ?: URI("http://localhost"), - headScheduler, + wsScheduler, ).apply { config = endpoint basicAuth = endpoint.basicAuth @@ -424,6 +425,7 @@ open class ConfiguredUpstreams( blockValidator, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, chainsConf.expectedBlockTime, ) if (!connectorFactory.isValid()) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt index 3ab362dee..1cc85978b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/AbstractHead.kt @@ -168,6 +168,10 @@ abstract class AbstractHead @JvmOverloads constructor( // NOOP } + override fun headLiveness(): Flux { + return Flux.empty() + } + override fun start() { stopping = false log.debug("Start ${this.javaClass.simpleName} $upstreamId") diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 8a4996a1f..bc896c566 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -56,6 +56,9 @@ abstract class DefaultUpstream( private val statusStream = Sinks.many() .multicast() .directBestEffort() + protected val stateStream: Sinks.Many = Sinks.many() + .multicast() + .directBestEffort() init { if (id.length < 3 || !id.matches(Regex("[a-zA-Z][a-zA-Z0-9_-]+[a-zA-Z0-9]"))) { @@ -106,6 +109,10 @@ abstract class DefaultUpstream( return statusStream.asFlux().distinctUntilChanged() } + override fun observeState(): Flux { + return stateStream.asFlux() + } + override fun setLag(lag: Long) { lag.coerceAtLeast(0).let { nLag -> status.updateAndGet { curr -> diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt index a67bdb2e6..dac5b4d98 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt @@ -39,4 +39,6 @@ class EmptyHead : Head { override fun onSyncingNode(isSyncing: Boolean) { } + + override fun headLiveness(): Flux = Flux.empty() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt index d3e133b1f..1ba599b3e 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt @@ -43,4 +43,6 @@ interface Head { fun stop() fun onSyncingNode(isSyncing: Boolean) + + fun headLiveness(): Flux } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index 41ce2c572..7980918bd 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -265,6 +265,10 @@ abstract class Multistream( ).distinct() } + override fun observeState(): Flux { + return Flux.empty() + } + override fun isAvailable(): Boolean { return getAll().any { it.isAvailable() } } @@ -315,10 +319,14 @@ abstract class Multistream( .distinctUntilChanged { it.getId() }.flatMap { upstream -> - upstream.observeStatus().map { upstream } + val statusStream = upstream.observeStatus().map { upstream } + val stateStream = upstream.observeState().map { upstream } + Flux.merge(stateStream, statusStream) .takeUntilOther( subscribeRemovedUpstreams() - .filter { it.getId() == upstream.getId() }, + .filter { + it.getId() == upstream.getId() + }, ) } .subscribe { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt index 817832349..422f423de 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Upstream.kt @@ -26,6 +26,7 @@ interface Upstream : Lifecycle { fun isAvailable(): Boolean fun getStatus(): UpstreamAvailability fun observeStatus(): Flux + fun observeState(): Flux fun getHead(): Head /** diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EnrichedMergedHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EnrichedMergedHead.kt index 1cfb1cfb9..b6a4b6ec4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EnrichedMergedHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EnrichedMergedHead.kt @@ -91,4 +91,6 @@ class EnrichedMergedHead constructor( } override fun onSyncingNode(isSyncing: Boolean) {} + + override fun headLiveness(): Flux = Flux.empty() } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt index 3428023d7..2f9bc3de6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumChainSpecific.kt @@ -39,6 +39,8 @@ object EthereumChainSpecific : ChainSpecific { override fun latestBlockRequest() = JsonRpcRequest("eth_getBlockByNumber", listOf("latest", false)) override fun listenNewHeadsRequest(): JsonRpcRequest = JsonRpcRequest("eth_subscribe", listOf("newHeads")) + override fun unsubscribeNewHeadsRequest(subId: String): JsonRpcRequest = + JsonRpcRequest("eth_unsubscribe", listOf(subId)) override fun localReaderBuilder( cachingReader: CachingReader, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt index fded48743..7615cca4d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHead.kt @@ -21,7 +21,6 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.Lifecycle -import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice import io.emeraldpay.dshackle.upstream.generic.ChainSpecific import io.emeraldpay.dshackle.upstream.generic.GenericHead @@ -32,6 +31,7 @@ import reactor.core.publisher.Sinks import reactor.core.scheduler.Scheduler import java.time.Duration import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicReference class GenericWsHead( forkChoice: ForkChoice, @@ -40,7 +40,7 @@ class GenericWsHead( private val wsSubscriptions: WsSubscriptions, private val wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, - private val upstream: DefaultUpstream, + upstream: DefaultUpstream, private val chainSpecific: ChainSpecific, ) : GenericHead(upstream.getId(), forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle { @@ -51,6 +51,9 @@ class GenericWsHead( private var subscription: Disposable? = null private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort() + private val headLivenessSink = Sinks.many().multicast().directBestEffort() + + private var subscriptionId = AtomicReference("") init { registerHeadResubscribeFlux() @@ -85,18 +88,14 @@ class GenericWsHead( fun listenNewHeads(): Flux { return subscribe() - .transform { - Flux.concat(it.next().doOnNext { upstream.setStatus(UpstreamAvailability.OK) }, it) - } .map { chainSpecific.parseHeader(it, "unknown") } .timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads"))) .onErrorResume { log.error("Error getting heads for $upstreamId - ${it.message}") - upstream.setStatus(UpstreamAvailability.UNAVAILABLE) subscribed = false - Mono.empty() + unsubscribe() } } @@ -106,6 +105,19 @@ class GenericWsHead( noHeadUpdatesSink.tryEmitComplete() } + override fun headLiveness(): Flux = headLivenessSink.asFlux() + + private fun unsubscribe(): Mono { + return wsSubscriptions.unsubscribe(chainSpecific.unsubscribeNewHeadsRequest(subscriptionId.get()).copy(id = ids.getAndIncrement())) + .flatMap { it.requireResult() } + .doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) } + .onErrorResume { + log.error("{} couldn't unsubscribe from newHeads", upstreamId, it) + Mono.empty() + } + .then(Mono.empty()) + } + private val ids = AtomicInteger(1) private fun subscribe(): Flux { @@ -113,6 +125,7 @@ class GenericWsHead( wsSubscriptions.subscribe(chainSpecific.listenNewHeadsRequest().copy(id = ids.getAndIncrement())) .also { connectionId = it.connectionId + subscriptionId = it.subId if (!connected) { connected = true } @@ -126,6 +139,7 @@ class GenericWsHead( val connectionStates = wsSubscriptions.connectionInfoFlux() .map { if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) { + headLivenessSink.emitNext(false) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } subscribed = false connected = false connectionId = null diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt index c3136c958..1677130c2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidator.kt @@ -19,8 +19,9 @@ class HeadLivenessValidator( } fun getFlux(): Flux { + val headLiveness = head.headLiveness() // first we have moving window of 2 blocks and check that they are consecutive ones - return head.getFlux().map { it.height }.buffer(2, 1).map { + val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map { it.last() - it.first() == 1L }.scan(Pair(0, true)) { acc, value -> // then we accumulate consecutive true events, false resets counter @@ -52,5 +53,7 @@ class HeadLivenessValidator( } }, ).repeat().subscribeOn(scheduler) + + return Flux.merge(headFlux, headLiveness) } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionFactory.kt index dcec296f6..41de1f150 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionFactory.kt @@ -43,8 +43,8 @@ open class WsConnectionFactory( ) } - open fun createWsConnection(connIndex: Int = 0, onDisconnect: () -> Unit): WsConnection = - WsConnectionImpl(uri, origin, basicAuth, metrics(connIndex), onDisconnect, scheduler).also { ws -> + open fun createWsConnection(connIndex: Int = 0): WsConnection = + WsConnectionImpl(uri, origin, basicAuth, metrics(connIndex), scheduler).also { ws -> config?.frameSize?.let { ws.frameSize = it } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt index c5893e5de..d18129de6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImpl.kt @@ -67,7 +67,6 @@ open class WsConnectionImpl( private val origin: URI, private val basicAuth: AuthConfig.ClientBasicAuth?, private val rpcMetrics: RpcMetrics?, - private val onDisconnect: () -> Unit, private val scheduler: Scheduler, ) : AutoCloseable, WsConnection, Cloneable { @@ -198,7 +197,6 @@ open class WsConnectionImpl( connection = HttpClient.create() .resolver(DefaultAddressResolverGroup.INSTANCE) .doOnDisconnected { - onDisconnect() disconnects.tryEmitNext(Instant.now()) log.info("Disconnected from $uri") if (keepConnection) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt index c072ca379..408259346 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPool.kt @@ -16,8 +16,6 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.Global -import io.emeraldpay.dshackle.upstream.DefaultUpstream -import io.emeraldpay.dshackle.upstream.UpstreamAvailability import org.springframework.util.backoff.BackOffExecution import org.springframework.util.backoff.ExponentialBackOff import reactor.core.Disposable @@ -39,7 +37,6 @@ import kotlin.concurrent.write */ class WsConnectionMultiPool( private val wsConnectionFactory: WsConnectionFactory, - private val upstream: DefaultUpstream, private val connections: Int, ) : WsConnectionPool { @@ -110,16 +107,14 @@ class WsConnectionMultiPool( SCHEDULE_FULL } else { current.add( - wsConnectionFactory.createWsConnection(connIndex++) { - if (isUnavailable()) { - upstream.setStatus(UpstreamAvailability.UNAVAILABLE) - } - }.also { - it.connect() - connectionSubscriptionMap[it.connectionId()] = it.connectionInfoFlux().subscribe { info -> - connectionInfo.emitNext(info) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } - } - }, + wsConnectionFactory.createWsConnection(connIndex++) + .also { + it.connect() + connectionSubscriptionMap[it.connectionId()] = it.connectionInfoFlux() + .subscribe { info -> + connectionInfo.emitNext(info) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } + } + }, ) SCHEDULE_GROW } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionPoolFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionPoolFactory.kt index fc5b14b53..71a580c44 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionPoolFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionPoolFactory.kt @@ -29,9 +29,9 @@ class WsConnectionPoolFactory( "Creating instance for different upstream. ${upstream.getId()} != id" } return if (connections > 1) { - WsConnectionMultiPool(wsConnectionFactory, upstream, connections) + WsConnectionMultiPool(wsConnectionFactory, connections) } else { - WsConnectionSinglePool(wsConnectionFactory, upstream) + WsConnectionSinglePool(wsConnectionFactory) } } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt index 3a1dd14af..7d792f5a2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionSinglePool.kt @@ -15,17 +15,12 @@ */ package io.emeraldpay.dshackle.upstream.ethereum -import io.emeraldpay.dshackle.upstream.DefaultUpstream -import io.emeraldpay.dshackle.upstream.UpstreamAvailability import reactor.core.publisher.Flux class WsConnectionSinglePool( wsConnectionFactory: WsConnectionFactory, - private val upstream: DefaultUpstream, ) : WsConnectionPool { - private val connection = wsConnectionFactory.createWsConnection { - upstream.setStatus(UpstreamAvailability.UNAVAILABLE) - } + private val connection = wsConnectionFactory.createWsConnection() override fun connect() { if (!connection.isConnected) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt index ad8cfe235..0442f1353 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptions.kt @@ -16,7 +16,10 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.util.concurrent.atomic.AtomicReference /** * A JSON-RPC Subscription client. @@ -42,8 +45,11 @@ interface WsSubscriptions { fun connectionInfoFlux(): Flux + fun unsubscribe(request: JsonRpcRequest): Mono + data class SubscribeData( val data: Flux, val connectionId: String, + val subId: AtomicReference, ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt index 8712282e0..7a79898a9 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/WsSubscriptionsImpl.kt @@ -17,6 +17,7 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import org.slf4j.LoggerFactory import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -49,7 +50,15 @@ class WsSubscriptionsImpl( } } - return WsSubscriptions.SubscribeData(messageFlux, conn.connectionId()) + return WsSubscriptions.SubscribeData(messageFlux, conn.connectionId(), subscriptionId) + } + + override fun unsubscribe(request: JsonRpcRequest): Mono { + if (request.params.isEmpty() || request.params.contains("")) { + return Mono.empty() + } + return wsPool.getConnection() + .callRpc(request) } override fun connectionInfoFlux(): Flux = diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt index d8fac2f0b..61b8b835c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/ChainSpecific.kt @@ -43,6 +43,8 @@ interface ChainSpecific { fun listenNewHeadsRequest(): JsonRpcRequest + fun unsubscribeNewHeadsRequest(subId: String): JsonRpcRequest + fun localReaderBuilder(cachingReader: CachingReader, methods: CallMethods, head: Head): Mono fun subscriptionBuilder(headScheduler: Scheduler): (Multistream) -> EgressSubscription diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt index bfa0fbb7c..51668f4f8 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/GenericUpstream.kt @@ -25,6 +25,7 @@ import org.springframework.context.ApplicationEventPublisher import org.springframework.context.Lifecycle import reactor.core.Disposable import reactor.core.publisher.Flux +import reactor.core.publisher.Sinks import java.time.Duration import java.util.concurrent.atomic.AtomicBoolean @@ -165,7 +166,7 @@ open class GenericUpstream( } livenessSubscription = connector.hasLiveSubscriptionHead().subscribe({ hasLiveSubscriptionHead.set(it) - eventPublisher?.publishEvent(UpstreamChangeEvent(chain, this, UpstreamChangeEvent.ChangeType.UPDATED)) + stateStream.emitNext(true) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } }, { log.debug("Error while checking live subscription for ${getId()}", it) },) @@ -197,5 +198,5 @@ open class GenericUpstream( return connector.getIngressSubscription() } - override fun isRunning() = connector.isRunning() + override fun isRunning() = connector.isRunning() && validationSettingsSubscription == null } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt index 3d7c447e5..40a35a7a4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericConnectorFactory.kt @@ -22,6 +22,7 @@ open class GenericConnectorFactory( private val blockValidator: BlockValidator, private val wsConnectionResubscribeScheduler: Scheduler, private val headScheduler: Scheduler, + private val headLivenessScheduler: Scheduler, private val expectedBlockTime: Duration, ) : ConnectorFactory { @@ -58,6 +59,7 @@ open class GenericConnectorFactory( blockValidator, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, expectedBlockTime, specific, ) @@ -74,6 +76,7 @@ open class GenericConnectorFactory( blockValidator, wsConnectionResubscribeScheduler, headScheduler, + headLivenessScheduler, expectedBlockTime, specific, ) diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt index 165fb2370..e73f0b432 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericRpcConnector.kt @@ -39,6 +39,7 @@ class GenericRpcConnector( blockValidator: BlockValidator, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, + headLivenessScheduler: Scheduler, expectedBlockTime: Duration, chainSpecific: ChainSpecific, ) : GenericConnector, CachesEnabled { @@ -52,7 +53,7 @@ class GenericRpcConnector( } override fun hasLiveSubscriptionHead(): Flux { - return liveness.getFlux() + return liveness.getFlux().distinctUntilChanged() } init { @@ -107,7 +108,7 @@ class GenericRpcConnector( ) } } - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, id) + liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, id) } override fun setCaches(caches: Caches) { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt index d0f268dcd..6ee26980a 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/generic/connectors/GenericWsConnector.kt @@ -24,6 +24,7 @@ class GenericWsConnector( blockValidator: BlockValidator, wsConnectionResubscribeScheduler: Scheduler, headScheduler: Scheduler, + headLivenessScheduler: Scheduler, expectedBlockTime: Duration, chainSpecific: ChainSpecific, ) : GenericConnector { @@ -46,12 +47,12 @@ class GenericWsConnector( upstream, chainSpecific, ) - liveness = HeadLivenessValidator(head, expectedBlockTime, headScheduler, upstream.getId()) + liveness = HeadLivenessValidator(head, expectedBlockTime, headLivenessScheduler, upstream.getId()) subscriptions = chainSpecific.makeIngressSubscription(wsSubscriptions) } override fun hasLiveSubscriptionHead(): Flux { - return liveness.getFlux() + return liveness.getFlux().distinctUntilChanged() } override fun start() { pool.connect() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt index 89eb81a0a..17ef94196 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/polkadot/PolkadotChainSpecific.kt @@ -65,6 +65,9 @@ object PolkadotChainSpecific : ChainSpecific { override fun listenNewHeadsRequest(): JsonRpcRequest = JsonRpcRequest("chain_subscribeNewHeads", listOf()) + override fun unsubscribeNewHeadsRequest(subId: String): JsonRpcRequest = + JsonRpcRequest("chain_unsubscribeNewHeads", listOf(subId)) + override fun localReaderBuilder( cachingReader: CachingReader, methods: CallMethods, diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt index bc42903a0..abbd07a3d 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/starknet/StarknetChainSpecific.kt @@ -62,6 +62,10 @@ object StarknetChainSpecific : ChainSpecific { throw NotImplementedError() } + override fun unsubscribeNewHeadsRequest(subId: String): JsonRpcRequest { + throw NotImplementedError() + } + override fun localReaderBuilder( cachingReader: CachingReader, methods: CallMethods, diff --git a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy index c3b66f95c..37778a791 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/startup/ConfiguredUpstreamsSpec.groovy @@ -35,6 +35,8 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -69,6 +71,8 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -102,6 +106,8 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -130,6 +136,8 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) @@ -163,6 +171,8 @@ class ConfiguredUpstreamsSpec extends Specification { Schedulers.boundedElastic(), null, Schedulers.boundedElastic(), + Schedulers.boundedElastic(), + Schedulers.boundedElastic(), AuthorizationConfig.default(), new GrpcAuthContext() ) diff --git a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy index ac02a4fbe..76ef2429a 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/test/EthereumHeadMock.groovy @@ -82,4 +82,9 @@ class EthereumHeadMock implements Head { void onSyncingNode(boolean isSyncing) { } + + @Override + Flux headLiveness() { + return Flux.empty() + } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy index 780c849e2..a7420f87b 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/FilteredApisSpec.groovy @@ -64,6 +64,7 @@ class FilteredApisSpec extends Specification { BlockValidator.ALWAYS_VALID, Schedulers.boundedElastic(), Schedulers.boundedElastic(), + Schedulers.boundedElastic(), Duration.ofSeconds(12) ) new GenericUpstream( diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy index 23268d8a3..df0940777 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/GenericWsHeadSpec.groovy @@ -24,10 +24,10 @@ import io.emeraldpay.dshackle.upstream.BlockValidator import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson import io.emeraldpay.dshackle.upstream.forkchoice.AlwaysForkChoice +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest +import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse import io.emeraldpay.etherjar.domain.BlockHash -import io.emeraldpay.etherjar.domain.TransactionId import io.emeraldpay.etherjar.rpc.json.TransactionRefJson -import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.Sinks @@ -38,6 +38,7 @@ import spock.lang.Specification import java.time.Duration import java.time.Instant import java.time.temporal.ChronoUnit +import java.util.concurrent.atomic.AtomicReference class GenericWsHeadSpec extends Specification { @@ -74,7 +75,7 @@ class GenericWsHeadSpec extends Specification { act == res 1 * ws.subscribe(_) >> new WsSubscriptions.SubscribeData( - Flux.fromIterable([headBlock]), "id" + Flux.fromIterable([headBlock]), "id", new AtomicReference("") ) } @@ -96,8 +97,8 @@ class GenericWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 2 * subscribe(_) >>> [ - new WsSubscriptions.SubscribeData(Flux.error(new RuntimeException()), "id"), - new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id") + new WsSubscriptions.SubscribeData(Flux.error(new RuntimeException()), "id", new AtomicReference("")), + new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id", new AtomicReference("")) ] } @@ -150,8 +151,8 @@ class GenericWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 2 * subscribe(_) >>> [ - new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), - new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id") + new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id", new AtomicReference("")), + new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id", new AtomicReference("")) ] } @@ -191,7 +192,7 @@ class GenericWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 1 * subscribe(_) >>> [ - new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), + new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id", new AtomicReference("")), ] } @@ -230,7 +231,7 @@ class GenericWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 1 * subscribe(_) >>> [ - new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), + new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id", new AtomicReference("")), ] } @@ -282,8 +283,8 @@ class GenericWsHeadSpec extends Specification { def ws = Mock(WsSubscriptions) { 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() 2 * subscribe(_) >>> [ - new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id"), - new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id"), + new WsSubscriptions.SubscribeData(Flux.fromIterable([firstHeadBlock]), "id", new AtomicReference("")), + new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id", new AtomicReference("")), ] } @@ -316,4 +317,74 @@ class GenericWsHeadSpec extends Specification { .thenCancel() .verify(Duration.ofSeconds(1)) } + + def "Unsubscribe if there is an error during subscription"() { + setup: + def block = new BlockJson() + block.number = 100 + block.hash = BlockHash.from("0x3ec2ebf5d0ec474d0ac6bc50d2770d8409ad76e119968e7919f85d5ec8915200") + block.parentHash = parent + block.timestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS) + block.uncles = [] + block.totalDifficulty = BigInteger.ONE + + def apiMock = TestingCommons.api() + def subId = "subId" + def ws = Mock(WsSubscriptions) { + 1 * it.connectionInfoFlux() >> Flux.empty() + 1 * it.subscribe(_) >> new WsSubscriptions.SubscribeData( + Flux.error(new RuntimeException()), "id", new AtomicReference(subId) + ) + 1 * it.unsubscribe(new JsonRpcRequest("eth_unsubscribe", List.of(subId), 2, null, null)) >> + Mono.just(new JsonRpcResponse("".bytes, null)) + } + + def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + + when: + def act = head.listenNewHeads() + + then: + StepVerifier.create(act) + .expectComplete() + .verify(Duration.ofSeconds(1)) + } + + def "If there is ws disconnect then head must emit false its liveness state"() { + setup: + def secondBlock = new BlockJson() + secondBlock.parentHash = parent + secondBlock.timestamp = Instant.now().truncatedTo(ChronoUnit.SECONDS) + secondBlock.number = 105 + secondBlock.hash = BlockHash.from("0x29229361dc5aa1ec66c323dc7a299e2b61a8c8dd2a3522d41255ec10eca25dd8") + + def secondHeadBlock = secondBlock.with { + Global.objectMapper.writeValueAsBytes(it) + } + + def apiMock = TestingCommons.api() + + def connectionInfoSink = Sinks.many().multicast().directBestEffort() + def ws = Mock(WsSubscriptions) { + 1 * it.connectionInfoFlux() >> connectionInfoSink.asFlux() + 1 * subscribe(_) >>> [ + new WsSubscriptions.SubscribeData(Flux.fromIterable([secondHeadBlock]), "id", new AtomicReference("")) + ] + } + + def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE) + + when: + head.start() + def liveness = head.headLiveness() + + then: + StepVerifier.create(liveness) + .then { + connectionInfoSink.tryEmitNext(new WsConnection.ConnectionInfo("id", WsConnection.ConnectionState.DISCONNECTED)) + } + .expectNext(false) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy index 3c7ff7540..3961c8d57 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/HeadLivenessValidatorSpec.groovy @@ -2,13 +2,13 @@ package io.emeraldpay.dshackle.upstream.ethereum import io.emeraldpay.dshackle.test.EthereumHeadMock import io.emeraldpay.dshackle.test.TestingCommons +import io.emeraldpay.dshackle.upstream.Head +import reactor.core.publisher.Flux import reactor.core.scheduler.Schedulers import reactor.test.StepVerifier import spock.lang.Specification import java.time.Duration -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger class HeadLivenessValidatorSpec extends Specification{ def "emits true"() { @@ -24,6 +24,20 @@ class HeadLivenessValidatorSpec extends Specification{ }.expectNext(true).thenCancel().verify(Duration.ofSeconds(1)) } + def "emits false if head liveness emits false"() { + when: + def head = Mock(Head) { + 1 * it.headLiveness() >> Flux.just(false) + 1 * it.getFlux() >> Flux.just(TestingCommons.blockForEthereum(1)) + } + def checker = new HeadLivenessValidator(head, Duration.ofSeconds(10), Schedulers.boundedElastic(), "test") + then: + StepVerifier.create(checker.flux) + .expectNext(false) + .thenCancel() + .verify(Duration.ofSeconds(1)) + } + def "starts accumulating trues but immediately emits after false"() { when: def head = new EthereumHeadMock() diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy index 0d1d35dae..f44a9c9f8 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionImplRealSpec.groovy @@ -5,7 +5,6 @@ import io.emeraldpay.dshackle.test.GenericUpstreamMock import io.emeraldpay.dshackle.test.MockWSServer import io.emeraldpay.dshackle.test.TestingCommons import io.emeraldpay.dshackle.upstream.DefaultUpstream -import io.emeraldpay.dshackle.upstream.UpstreamAvailability import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest import reactor.core.scheduler.Schedulers import reactor.test.StepVerifier @@ -110,33 +109,6 @@ class WsConnectionImplRealSpec extends Specification { .verify(Duration.ofSeconds(1)) } - def "Gets UNAVAIL status right after disconnect"() { - setup: - def up = Mock(DefaultUpstream) { - _ * getId() >> "test" - } - conn = new WsConnectionPoolFactory( - "test", - 1, - new WsConnectionFactory( - "test", - Chain.ETHEREUM__MAINNET, - "ws://localhost:${port}".toURI(), - "http://localhost:${port}".toURI(), - Schedulers.boundedElastic() - ) - ).create(up).getConnection() - when: - conn.connect() - conn.reconnectIntervalSeconds = 10 - Thread.sleep(SLEEP) - server.stop() - Thread.sleep(100) - - then: - 1 * up.setStatus(UpstreamAvailability.UNAVAILABLE) - } - def "Try to connects to server until it's available"() { when: server.stop() diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPoolSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPoolSpec.groovy index cc1897312..32cfd40e1 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPoolSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/WsConnectionMultiPoolSpec.groovy @@ -30,14 +30,14 @@ class WsConnectionMultiPoolSpec extends Specification { } def up = Mock(DefaultUpstream) def factory = Mock(WsConnectionFactory) - def pool = new WsConnectionMultiPool(factory, up, 3) + def pool = new WsConnectionMultiPool(factory, 3) pool.scheduler = Stub(ScheduledExecutorService) when: pool.connect() then: - 1 * factory.createWsConnection(0, _) >> conn + 1 * factory.createWsConnection(0) >> conn 1 * conn.connect() } @@ -54,14 +54,14 @@ class WsConnectionMultiPoolSpec extends Specification { } def up = Mock(DefaultUpstream) def factory = Mock(WsConnectionFactory) - def pool = new WsConnectionMultiPool(factory, up, 3) + def pool = new WsConnectionMultiPool(factory, 3) pool.scheduler = Stub(ScheduledExecutorService) when: pool.connect() then: - 1 * factory.createWsConnection(0, _) >> conn1 + 1 * factory.createWsConnection(0) >> conn1 1 * conn1.connect() when: @@ -69,7 +69,7 @@ class WsConnectionMultiPoolSpec extends Specification { then: 1 * conn1.isConnected() >> true - 1 * factory.createWsConnection(1, _) >> conn2 + 1 * factory.createWsConnection(1) >> conn2 1 * conn2.connect() when: @@ -78,7 +78,7 @@ class WsConnectionMultiPoolSpec extends Specification { then: 1 * conn1.isConnected() >> true 1 * conn2.isConnected() >> true - 1 * factory.createWsConnection(2, _) >> conn3 + 1 * factory.createWsConnection(2) >> conn3 1 * conn3.connect() when: @@ -88,7 +88,7 @@ class WsConnectionMultiPoolSpec extends Specification { 1 * conn1.isConnected() >> true 1 * conn2.isConnected() >> true 1 * conn3.isConnected() >> true - 0 * factory.createWsConnection(_, _) + 0 * factory.createWsConnection(_) } def "recreate connection after failure"() { @@ -107,7 +107,7 @@ class WsConnectionMultiPoolSpec extends Specification { } def up = Mock(DefaultUpstream) def factory = Mock(WsConnectionFactory) - def pool = new WsConnectionMultiPool(factory, up, 3) + def pool = new WsConnectionMultiPool(factory, 3) pool.scheduler = Stub(ScheduledExecutorService) when: "initial fill" @@ -119,9 +119,9 @@ class WsConnectionMultiPoolSpec extends Specification { _ * conn1.isConnected() >> true _ * conn2.isConnected() >> true _ * conn3.isConnected() >> true - 1 * factory.createWsConnection(0, _) >> conn1 - 1 * factory.createWsConnection(1, _) >> conn2 - 1 * factory.createWsConnection(2, _) >> conn3 + 1 * factory.createWsConnection(0) >> conn1 + 1 * factory.createWsConnection(1) >> conn2 + 1 * factory.createWsConnection(2) >> conn3 1 * conn1.connect() 1 * conn2.connect() 1 * conn3.connect() @@ -133,7 +133,7 @@ class WsConnectionMultiPoolSpec extends Specification { 1 * conn1.isConnected() >> true 1 * conn2.isConnected() >> true 1 * conn3.isConnected() >> true - 0 * factory.createWsConnection(_, _) + 0 * factory.createWsConnection(_) when: "one failed" pool.connect() @@ -142,7 +142,7 @@ class WsConnectionMultiPoolSpec extends Specification { (1.._) * conn1.isConnected() >> true (1.._) * conn2.isConnected() >> false (1.._) * conn3.isConnected() >> true - 0 * factory.createWsConnection(_, _) // doesn't create immediately, but schedules it for the next adjust + 0 * factory.createWsConnection(_) // doesn't create immediately, but schedules it for the next adjust 1 * conn2.close() when: "needs one more" @@ -151,7 +151,7 @@ class WsConnectionMultiPoolSpec extends Specification { then: 1 * conn1.isConnected() >> true 1 * conn3.isConnected() >> true - 1 * factory.createWsConnection(3, _) >> conn4 + 1 * factory.createWsConnection(3) >> conn4 1 * conn4.connect() } } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy index 4db0450fd..6e6d73f77 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/ethereum/subscribe/WebsocketPendingTxesSpec.groovy @@ -22,6 +22,7 @@ import reactor.core.publisher.Flux import spock.lang.Specification import java.time.Duration +import java.util.concurrent.atomic.AtomicReference class WebsocketPendingTxesSpec extends Specification { @@ -42,7 +43,7 @@ class WebsocketPendingTxesSpec extends Specification { then: 1 * ws.subscribe(new JsonRpcRequest("eth_subscribe", ["newPendingTransactions"])) >> new WsSubscriptions.SubscribeData( - Flux.fromIterable(responses), "id" + Flux.fromIterable(responses), "id", new AtomicReference("") ) txes.collect {it.toHex() } == [ "0xa61bab14fc9720ea8725622688c2f964666d7c2afdae38af7dad53f12f242d5c",