diff --git a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt index 426ce9e46..acd0d6466 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt @@ -340,11 +340,12 @@ open class ConfiguredUpstreams( this.options = options } log.info("Using ALL CHAINS (gRPC) upstream, at ${endpoint.host}:${endpoint.port}") - ds.start() + ds.subscribeUpstreamChanges() .doOnNext { log.info("Chain ${it.chain} ${it.type} through gRPC at ${endpoint.host}:${endpoint.port}. With caps: ${it.upstream.getCapabilities()}") } .subscribe(currentUpstreams::update) + ds.startStatusUpdates() } private fun buildHttpClient(config: UpstreamsConfig.Upstream): JsonRpcHttpClient? { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt index 53639ae53..90f6ba16c 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcHead.kt @@ -18,21 +18,23 @@ package io.emeraldpay.dshackle.upstream.grpc import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.api.proto.Common import io.emeraldpay.api.proto.ReactorBlockchainGrpc -import io.emeraldpay.dshackle.Defaults +import io.emeraldpay.dshackle.SilentException +import io.emeraldpay.dshackle.commons.DurableFlux import io.emeraldpay.dshackle.data.BlockContainer import io.emeraldpay.dshackle.upstream.AbstractHead import io.emeraldpay.dshackle.upstream.DefaultUpstream import io.emeraldpay.dshackle.upstream.UpstreamAvailability -import io.emeraldpay.etherjar.rpc.RpcException +import io.emeraldpay.grpc.BlockchainType import io.emeraldpay.grpc.Chain import org.reactivestreams.Publisher import org.slf4j.LoggerFactory import org.springframework.context.Lifecycle +import org.springframework.util.backoff.ExponentialBackOff import reactor.core.Disposable import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.util.retry.Retry import java.time.Duration +import java.util.concurrent.atomic.AtomicBoolean import java.util.function.Function class GrpcHead( @@ -54,6 +56,7 @@ class GrpcHead( } private var headSubscription: Disposable? = null + private val shouldBeRunning = AtomicBoolean(false) /** * Initiate a new head subscription with connection to the remote @@ -64,51 +67,54 @@ class GrpcHead( } log.debug("Start Head subscription to ${parent.getId()}") - val source = Flux.concat( - // first connect immediately - Flux.just(remote), - // following requests do with delay, give it a time to recover - Flux.just(remote).repeat().delayElements(Defaults.retryConnection) - ).flatMap(this::subscribeHead) - - internalStart(source) + val blocks = DurableFlux( + { connect(remote) }, + ExponentialBackOff(100, 1.5), + log, + shouldBeRunning, + ) + headSubscription = super.follow(blocks.connect()) } - fun subscribeHead(client: ReactorBlockchainGrpc.ReactorBlockchainStub): Publisher { + private fun connect(remote: ReactorBlockchainGrpc.ReactorBlockchainStub): Flux { val chainRef = Common.Chain.newBuilder() .setTypeValue(chain.id) .build() - return client.subscribeHead(chainRef) - // simple retry on failure, if eventually failed then it supposed to resubscribe later from outer method - .retryWhen(Retry.backoff(4, Duration.ofSeconds(1))) - .onErrorContinue { err, _ -> - log.warn("Disconnected $chain from ${parent.getId()}: ${err.message}") + return remote.subscribeHead(chainRef) + // if nothing returned for a relatively long period it's probably because of a broken connection, so in this case we force to drop the connection + .timeout( + expectEventsTime(), + Mono.fromCallable { log.info("No events received from ${parent.getId()}. Reconnecting...") } + .then(Mono.error(SilentException.Timeout("No Events"))) + ) + .doOnError { err -> + if (err !is SilentException) { + log.warn("Disconnected $chain from ${parent.getId()}: ${err.message}") + } parent.setStatus(UpstreamAvailability.UNAVAILABLE) - Mono.empty() } + .map(converter) + .distinctUntilChanged(BlockContainer::hash) + .transform(enhanced()) } - /** - * Initiate a new head from provided source of head details - */ - private fun internalStart(source: Flux) { - var blocks = source.map(converter) - .distinctUntilChanged { - it.hash + private fun expectEventsTime(): Duration { + return try { + when (BlockchainType.from(chain)) { + BlockchainType.BITCOIN -> Duration.ofHours(1) + BlockchainType.ETHEREUM -> Duration.ofMinutes(5) } - if (enhancer != null) { - blocks = blocks.flatMap(enhancer) + } catch (e: IllegalArgumentException) { + Duration.ofMinutes(15) } + } - blocks = blocks.onErrorContinue { err, _ -> - if (err is RpcException) { - log.error("Head subscription error on ${parent.getId()}. ${err.javaClass.name}:${err.message}") - } else { - log.error("Head subscription error on ${parent.getId()}. ${err.javaClass.name}:${err.message}", err) - } + private fun enhanced(): Function, Flux> { + return if (enhancer != null) { + Function { blocks -> blocks.flatMap(enhancer) } + } else { + Function.identity() } - - headSubscription = super.follow(blocks) } override fun isRunning(): Boolean { @@ -117,10 +123,12 @@ class GrpcHead( override fun start() { headSubscription?.dispose() + shouldBeRunning.set(true) this.internalStart(remote) } override fun stop() { + shouldBeRunning.set(false) headSubscription?.dispose() } } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt index 53a4c625c..05f1f4ee6 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/grpc/GrpcUpstreams.kt @@ -19,6 +19,7 @@ package io.emeraldpay.dshackle.upstream.grpc import io.emeraldpay.api.proto.BlockchainOuterClass import io.emeraldpay.api.proto.ReactorBlockchainGrpc import io.emeraldpay.dshackle.FileResolver +import io.emeraldpay.dshackle.commons.DurableFlux import io.emeraldpay.dshackle.config.AuthConfig import io.emeraldpay.dshackle.config.UpstreamsConfig import io.emeraldpay.dshackle.monitoring.Channel @@ -46,12 +47,15 @@ import io.netty.handler.ssl.SslContextBuilder import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import org.slf4j.LoggerFactory +import org.springframework.util.backoff.ExponentialBackOff import reactor.core.Disposable import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers import java.net.ConnectException import java.time.Duration -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock +import java.util.function.Supplier import kotlin.concurrent.withLock class GrpcUpstreams( @@ -66,40 +70,79 @@ class GrpcUpstreams( var options = UpstreamsConfig.PartialOptions.getDefaults().build() - private var client: ReactorBlockchainGrpc.ReactorBlockchainStub? = null + private var clientValue: ReactorBlockchainGrpc.ReactorBlockchainStub? = null private val known = HashMap() private val lock = ReentrantLock() - fun start(): Flux { - val channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) { - NettyChannelBuilder.forAddress(conn.host, conn.port) - // some messages are very large. many of them in megabytes, some even in gigabytes (ex. ETH Traces) - .maxInboundMessageSize(Int.MAX_VALUE) - .useTransportSecurity() - .enableRetry() - .maxRetryAttempts(3) - .sslContext(withTls(conn.auth!!)) - } else { - ManagedChannelBuilder.forAddress(conn.host, conn.port) - .let { - if (conn.autoTls == true) { - it.useTransportSecurity() - } else { - log.warn("Using insecure connection to ${conn.host}:${conn.port}") - it.usePlaintext() + private val client: ReactorBlockchainGrpc.ReactorBlockchainStub + get() { + if (clientValue != null) { + return clientValue!! + } + val channel: ManagedChannelBuilder<*> = if (conn.auth != null && StringUtils.isNotEmpty(conn.auth!!.ca)) { + NettyChannelBuilder.forAddress(conn.host, conn.port) + // some messages are very large. many of them in megabytes, some even in gigabytes (ex. ETH Traces) + .maxInboundMessageSize(Int.MAX_VALUE) + .useTransportSecurity() + .enableRetry() + .maxRetryAttempts(3) + .sslContext(withTls(conn.auth!!)) + } else { + ManagedChannelBuilder.forAddress(conn.host, conn.port) + .let { + if (conn.autoTls == true) { + it.useTransportSecurity() + } else { + log.warn("Using insecure connection to ${conn.host}:${conn.port}") + it.usePlaintext() + } } - } + } + + this.clientValue = ReactorBlockchainGrpc.newReactorStub(channel.build()) + return this.clientValue!! } - val client = ReactorBlockchainGrpc.newReactorStub(channel.build()) - this.client = client + fun subscribeUpstreamChanges(): Flux { + val connect = { + Flux.interval(Duration.ZERO, Duration.ofMinutes(1)) + .flatMap { client.describe(BlockchainOuterClass.DescribeRequest.newBuilder().build()) } + .transform(catchIOError()) + .flatMap(::processDescription) + .doOnError { t -> log.error("Failed to process update from gRPC upstream $id", t) } + } - val statusSubscription = AtomicReference() + return DurableFlux( + connect, + ExponentialBackOff(100L, 1.5), + log, + AtomicBoolean(true) + ).connect() + } - val updates = Flux.interval(Duration.ZERO, Duration.ofMinutes(1)) - .flatMap { - client.describe(BlockchainOuterClass.DescribeRequest.newBuilder().build()) - }.onErrorContinue { t, _ -> + fun startStatusUpdates(): Disposable { + val connection = DurableFlux( + { + client + .subscribeStatus(BlockchainOuterClass.StatusRequest.newBuilder().build()) + .transform(catchIOError()) + }, + ExponentialBackOff(100L, 1.5), + log, + AtomicBoolean(true) + ) + return connection + .connect() + .subscribeOn(Schedulers.boundedElastic()) + .subscribe { value -> + val chain = Chain.byId(value.chain.number) + known[chain]?.onStatus(value) + } + } + + fun catchIOError(): java.util.function.Function, Flux> { + return java.util.function.Function, Flux> { source -> + source.onErrorContinue { t, _ -> if (ExceptionUtils.indexOfType(t, ConnectException::class.java) >= 0) { log.warn("gRPC upstream ${conn.host}:${conn.port} is unavailable. (${t.javaClass}: ${t.message})") known.values.forEach { @@ -108,25 +151,8 @@ class GrpcUpstreams( } else { log.error("Failed to get description from ${conn.host}:${conn.port}", t) } - }.flatMap { value -> - processDescription(value) - }.doOnNext { - val subscription = client.subscribeStatus(BlockchainOuterClass.StatusRequest.newBuilder().build()) - .subscribe { value -> - val chain = Chain.byId(value.chain.number) - if (chain != Chain.UNSPECIFIED) { - known[chain]?.onStatus(value) - } - } - statusSubscription.updateAndGet { prev -> - prev?.dispose() - subscription - } - }.doOnError { t -> - log.error("Failed to process update from gRPC upstream $id", t) } - - return updates + } } fun processDescription(value: BlockchainOuterClass.DescribeResponse): Flux { @@ -180,29 +206,31 @@ class GrpcUpstreams( } fun getOrCreate(chain: Chain): UpstreamChange { - val metricsTags = listOf( - Tag.of("upstream", id), - Tag.of("chain", chain.chainCode) - ) + val metrics = Supplier { + val metricsTags = listOf( + Tag.of("upstream", id), + Tag.of("chain", chain.chainCode) + ) - val metrics = RpcMetrics( - metricsTags, - timer = Timer.builder("upstream.grpc.conn") - .description("Request time through a Dshackle/gRPC connection") - .tags(metricsTags) - .publishPercentileHistogram() - .register(Metrics.globalRegistry), - fails = Counter.builder("upstream.grpc.fail") - .description("Number of failures of Dshackle/gRPC requests") - .tags(metricsTags) - .register(Metrics.globalRegistry), - responseSize = DistributionSummary.builder("upstream.grpc.response.size") - .description("Size of Dshackle/gRPC responses") - .baseUnit("Bytes") - .tags(metricsTags) - .register(Metrics.globalRegistry), - connectionMetrics = ConnectionMetrics(metricsTags) - ) + RpcMetrics( + metricsTags, + timer = Timer.builder("upstream.grpc.conn") + .description("Request time through a Dshackle/gRPC connection") + .tags(metricsTags) + .publishPercentileHistogram() + .register(Metrics.globalRegistry), + fails = Counter.builder("upstream.grpc.fail") + .description("Number of failures of Dshackle/gRPC requests") + .tags(metricsTags) + .register(Metrics.globalRegistry), + responseSize = DistributionSummary.builder("upstream.grpc.response.size") + .description("Size of Dshackle/gRPC responses") + .baseUnit("Bytes") + .tags(metricsTags) + .register(Metrics.globalRegistry), + connectionMetrics = ConnectionMetrics(metricsTags) + ) + } val blockchainType = BlockchainType.from(chain) if (blockchainType == BlockchainType.ETHEREUM) { @@ -214,14 +242,14 @@ class GrpcUpstreams( } } - fun getOrCreateEthereum(chain: Chain, metrics: RpcMetrics): UpstreamChange { + fun getOrCreateEthereum(chain: Chain, metrics: Supplier): UpstreamChange { lock.withLock { val current = known[chain] return if (current == null) { - val rpcClient = JsonRpcGrpcClient(client!!, chain, metrics) { + val rpcClient = JsonRpcGrpcClient(client, chain, metrics.get()) { currentRequestLogWriter.wrap(it, id, Channel.DSHACKLE) } - val created = EthereumGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client!!, rpcClient) + val created = EthereumGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client, rpcClient) created.timeout = this.options.timeout known[chain] = created created.start() @@ -232,14 +260,14 @@ class GrpcUpstreams( } } - fun getOrCreateBitcoin(chain: Chain, metrics: RpcMetrics): UpstreamChange { + fun getOrCreateBitcoin(chain: Chain, metrics: Supplier): UpstreamChange { lock.withLock { val current = known[chain] return if (current == null) { - val rpcClient = JsonRpcGrpcClient(client!!, chain, metrics) { + val rpcClient = JsonRpcGrpcClient(client, chain, metrics.get()) { currentRequestLogWriter.wrap(it, id, Channel.DSHACKLE) } - val created = BitcoinGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client!!, rpcClient) + val created = BitcoinGrpcUpstream(id, forkWatchFactory.create(chain), role, chain, this.options, client, rpcClient) created.timeout = this.options.timeout known[chain] = created created.start()