diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt index e2b5dd4d0..590bbf1de 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt @@ -67,7 +67,7 @@ class SubscribeNodeStatus( ) // subscribe on head/status updates for just added upstreams - val multiStreamUpdates = Flux.merge( + val adds = Flux.merge( multistreams.all() .map { ms -> ms.subscribeAddedUpstreams() @@ -92,8 +92,19 @@ class SubscribeNodeStatus( } } ) + val updates = Flux.merge( + multistreams.all().map { ms -> + ms.subscribeUpdatedUpstreams().map { + NodeStatusResponse.newBuilder() + .setNodeId(it.getId()) + .setDescription(buildDescription(ms, it)) + .setStatus(buildStatus(it.getStatus(), it.getHead().getCurrentHeight())) + .build() + } + } + ) - return Flux.merge(upstreamUpdates, multiStreamUpdates, removals) + return Flux.merge(upstreamUpdates, adds, removals, updates) } private fun subscribeUpstreamUpdates( diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 68f5299bd..d160f1e2f 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -54,7 +54,7 @@ abstract class DefaultUpstream( private val status = AtomicReference(Status(defaultLag, defaultAvail, statusByLag(defaultLag, defaultAvail))) private val statusStream = Sinks.many() .multicast() - .directBestEffort() + .directBestEffort() init { if (id.length < 3 || !id.matches(Regex("[a-zA-Z][a-zA-Z0-9_-]+[a-zA-Z0-9]"))) { @@ -67,14 +67,9 @@ abstract class DefaultUpstream( } fun onStatus(value: BlockchainOuterClass.ChainStatus) { - this.onStatus(value, false) - } - - fun onStatus(value: BlockchainOuterClass.ChainStatus, stateChanged: Boolean = false) { val available = value.availability setStatus( if (available != null) UpstreamAvailability.fromGrpc(available.number) else UpstreamAvailability.UNAVAILABLE, - stateChanged ) } @@ -83,15 +78,11 @@ abstract class DefaultUpstream( } open fun setStatus(avail: UpstreamAvailability) { - this.setStatus(avail, false) - } - - open fun setStatus(avail: UpstreamAvailability, stateChanged: Boolean = false) { status.updateAndGet { curr -> Status(curr.lag, avail, statusByLag(curr.lag, avail)) }.also { statusStream.emitNext( - UpstreamChangeState(it.status, stateChanged) + it.status ) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } log.trace("Status of upstream [$id] changed to [$it], requested change status to [$avail]") } @@ -116,18 +107,7 @@ abstract class DefaultUpstream( } override fun observeStatus(): Flux { - return statusStream.asFlux() - .distinctUntilChanged( - { it }, - { prev, current -> - if (current.stateChanged) { - false - } else { - prev.status == current.status - } - } - ) - .map { it.status } + return statusStream.asFlux().distinctUntilChanged() } override fun setLag(lag: Long) { @@ -136,7 +116,7 @@ abstract class DefaultUpstream( Status(nLag, curr.avail, statusByLag(nLag, curr.avail)) }.also { statusStream.emitNext( - UpstreamChangeState(it.status, false) + it.status ) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } log.trace("Status of upstream [$id] changed to [$it], requested change lag to [$lag]") } @@ -171,9 +151,4 @@ abstract class DefaultUpstream( } data class Status(val lag: Long?, val avail: UpstreamAvailability, val status: UpstreamAvailability) - - private data class UpstreamChangeState( - val status: UpstreamAvailability, - val stateChanged: Boolean - ) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt index e20afede2..f29de96f2 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt @@ -38,7 +38,6 @@ import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.Sinks -import reactor.util.function.Tuples import java.time.Duration import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock @@ -80,9 +79,9 @@ abstract class Multistream( private val removedUpstreams = Sinks.many() .multicast() .directBestEffort() - private val stateStream = Sinks.many() + private val updateUpstreams = Sinks.many() .multicast() - .directBestEffort() + .directBestEffort() init { UpstreamAvailability.values().forEach { status -> @@ -206,16 +205,22 @@ abstract class Multistream( open fun onUpstreamsUpdated() { reconfigLock.withLock { val upstreams = getAll() - upstreams.map { it.getMethods() }.let { + upstreams.filter { it.isAvailable() }.map { it.getMethods() }.let { // TODO made list of uniq instances, and then if only one, just use it directly callMethods = AggregatedCallMethods(it) } capabilities = if (upstreams.isEmpty()) { emptySet() } else { - upstreams.map { up -> + upstreams.filter { it.isAvailable() }.map { up -> up.getCapabilities() - }.reduce { acc, curr -> acc + curr } + }.let { + if (it.isNotEmpty()) { + it.reduce { acc, curr -> acc + curr } + } else { + emptySet() + } + } } lagObserver?.stop() lagObserver = null @@ -281,26 +286,18 @@ abstract class Multistream( } private fun observeUpstreamsStatuses() { - stateStream.asFlux() - .subscribe { - upstreams.filter { it.isAvailable() }.map { it.getMethods() }.let { - callMethods = AggregatedCallMethods(it) - } - } - subscribeAddedUpstreams() - .filter { !it.isGrpc() } .distinctUntilChanged { it.getId() - }.map { - Tuples.of(it.getId(), it.observeStatus()) + }.flatMap { upstream -> + upstream.observeStatus().map { upstream } + .takeUntilOther( + subscribeRemovedUpstreams() + .filter { it.getId() == upstream.getId() } + ) } - .subscribe { pair -> - pair.t2.subscribe { status -> - stateStream.emitNext( - UpstreamChangeState(pair.t1, status) - ) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } - } + .subscribe { + onUpstreamChange(UpstreamChangeEvent(this.chain, it, UpstreamChangeEvent.ChangeType.UPDATED)) } } @@ -397,6 +394,7 @@ abstract class Multistream( UpstreamChangeEvent.ChangeType.REVALIDATED -> {} UpstreamChangeEvent.ChangeType.UPDATED -> { onUpstreamsUpdated() + updateUpstreams.emitNext(event.upstream) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED } } UpstreamChangeEvent.ChangeType.ADDED -> { if (!started) { @@ -441,9 +439,8 @@ abstract class Multistream( fun subscribeRemovedUpstreams(): Flux = removedUpstreams.asFlux() - - fun subscribeStateChanges(): Flux = - stateStream.asFlux() + fun subscribeUpdatedUpstreams(): Flux = + updateUpstreams.asFlux() abstract fun makeLagObserver(): HeadLagObserver diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt index 9423f8370..7260691eb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/BitcoinGrpcUpstream.kt @@ -171,7 +171,7 @@ class BitcoinGrpcUpstream( val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also { capabilities = newCapabilities } - conf.status?.let { status -> onStatus(status, upstreamStatusChanged) } + conf.status?.let { status -> onStatus(status) } return buildInfoChanged || upstreamStatusChanged } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt index 75a806751..f10eb59c4 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstream.kt @@ -156,7 +156,7 @@ open class EthereumGrpcUpstream( val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also { capabilities = newCapabilities } - conf.status?.let { status -> onStatus(status, upstreamStatusChanged) } + conf.status?.let { status -> onStatus(status) } val subsChanged = (conf.supportedSubscriptionsList != subscriptionTopics).also { subscriptionTopics = conf.supportedSubscriptionsList } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumPosGrpcUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumPosGrpcUpstream.kt index 3692a4e44..a72079484 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumPosGrpcUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/EthereumPosGrpcUpstream.kt @@ -121,10 +121,10 @@ open class EthereumPosGrpcUpstream( val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also { capabilities = newCapabilities } - conf.status?.let { status -> onStatus(status, upstreamStatusChanged) } val subsChanged = (conf.supportedSubscriptionsList != subscriptionTopics).also { subscriptionTopics = conf.supportedSubscriptionsList } + conf.status?.let { status -> onStatus(status) } return buildInfoChanged || upstreamStatusChanged || subsChanged } diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy index d73c468d3..49a4691fe 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/MultistreamSpec.groovy @@ -260,45 +260,19 @@ class MultistreamSpec extends Specification { ms.onUpstreamChange( new UpstreamChangeEvent(Chain.ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.ADDED) ) - def states = ms.subscribeStateChanges() + up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_UNAVAILABLE)) + up2.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) + up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) then: - StepVerifier.create(states) - .then { - up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_UNAVAILABLE)) - up2.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) - up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) - } - .expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.UNAVAILABLE)) - .expectNext(new Multistream.UpstreamChangeState(up2.getId(), UpstreamAvailability.OK)) - .expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.OK)) - .then { - assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3") - } - .then { - up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_SYNCING)) - } - .expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.SYNCING)) - .then { - assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2") - } - .then { - up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) - } - .expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.OK)) - .then { - assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3") - } - .then { - up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) - } - .expectNextCount(0) - .then { - up2.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) - } - .expectNextCount(0) - .thenCancel() - .verify(Duration.ofSeconds(3)) - + assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3") + when: + up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_SYNCING)) + then: + assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2") + when: + up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK)) + then: + assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3") } def "Filter older blocks on multistream head"() { diff --git a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy index 26587d6ab..56f799296 100644 --- a/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy +++ b/src/test/groovy/io/emeraldpay/dshackle/upstream/grpc/EthereumGrpcUpstreamSpec.groovy @@ -256,56 +256,6 @@ class EthereumGrpcUpstreamSpec extends Specification { h.height == 650247 } - def "Send update status if methods were changed"() { - setup: - def chain = Chain.ETHEREUM__MAINNET - def client = mockServer.clientForServer(new BlockchainGrpc.BlockchainImplBase() { - @Override - void nativeCall(BlockchainOuterClass.NativeCallRequest request, StreamObserver responseObserver) { - } - - @Override - void subscribeHead(Common.Chain request, StreamObserver responseObserver) { - } - }) - def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, chain, client, new JsonRpcGrpcClient(client, chain, metrics), null, ChainsConfig.ChainConfig.default(), Schedulers.boundedElastic()) - upstream.setLag(0) - upstream.setStatus(UpstreamAvailability.OK) - when: - def statuses = upstream.observeStatus() - then: - StepVerifier.create(statuses) - .then { - upstream.update( - describe(["eth_getBlockByHash"]), - BlockchainOuterClass.BuildInfo.newBuilder() - .setVersion(buildInfo.version) - .build(), - ) - } - .expectNext(UpstreamAvailability.OK) - .then { - upstream.update( - describe(["eth_getBlockByHash"]), - BlockchainOuterClass.BuildInfo.newBuilder() - .setVersion(buildInfo.version) - .build(), - ) - } - .expectNextCount(0) - .then { - upstream.update( - describe(["eth_getBlockByHash", "eth_getBlockByHash1"]), - BlockchainOuterClass.BuildInfo.newBuilder() - .setVersion(buildInfo.version) - .build(), - ) - } - .expectNext(UpstreamAvailability.OK) - .thenCancel() - .verify(Duration.ofSeconds(3)) - } - private BlockchainOuterClass.DescribeChain describe(List methods) { return BlockchainOuterClass.DescribeChain.newBuilder() .setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))