Skip to content

Commit

Permalink
Fix deadlock, remove setting UNAVAIL status (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Nov 6, 2023
1 parent 160e5ae commit e4c13ad
Show file tree
Hide file tree
Showing 34 changed files with 250 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ open class SchedulersConfig {
return makeScheduler("ws-connection-resubscribe-scheduler", 2, monitoringConfig)
}

@Bean
open fun wsScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("ws-scheduler", 4, monitoringConfig)
}

@Bean
open fun headLivenessScheduler(monitoringConfig: MonitoringConfig): Scheduler {
return makeScheduler("head-liveness-scheduler", 4, monitoringConfig)
}

@Bean
open fun grpcChannelExecutor(monitoringConfig: MonitoringConfig): Executor {
return makePool("grpc-client-channel", 10, monitoringConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ import org.springframework.context.ApplicationEventPublisher
import org.springframework.stereotype.Component
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import java.lang.IllegalStateException
import java.net.URI
import java.util.concurrent.Executor
import java.util.concurrent.Executors
Expand All @@ -95,6 +94,8 @@ open class ConfiguredUpstreams(
private val clientSpansInterceptor: ClientInterceptor?,
@Qualifier("headScheduler")
private val headScheduler: Scheduler,
private val wsScheduler: Scheduler,
private val headLivenessScheduler: Scheduler,
private val authorizationConfig: AuthorizationConfig,
private val grpcAuthContext: GrpcAuthContext,
) : ApplicationRunner {
Expand Down Expand Up @@ -258,7 +259,7 @@ open class ConfiguredUpstreams(
options,
config.role,
methods,
QuorumForLabels.QuorumItem(1, config.labels),
QuorumForLabels.QuorumItem(1, UpstreamsConfig.Labels.fromMap(config.labels)),
chainConfig,
connectorFactory,
eventPublisher,
Expand Down Expand Up @@ -388,7 +389,7 @@ open class ConfiguredUpstreams(
chain,
endpoint.url,
endpoint.origin ?: URI("http://localhost"),
headScheduler,
wsScheduler,
).apply {
config = endpoint
basicAuth = endpoint.basicAuth
Expand Down Expand Up @@ -424,6 +425,7 @@ open class ConfiguredUpstreams(
blockValidator,
wsConnectionResubscribeScheduler,
headScheduler,
headLivenessScheduler,
chainsConf.expectedBlockTime,
)
if (!connectorFactory.isValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ abstract class AbstractHead @JvmOverloads constructor(
// NOOP
}

override fun headLiveness(): Flux<Boolean> {
return Flux.empty()
}

override fun start() {
stopping = false
log.debug("Start ${this.javaClass.simpleName} $upstreamId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ abstract class DefaultUpstream(
private val statusStream = Sinks.many()
.multicast()
.directBestEffort<UpstreamAvailability>()
protected val stateStream: Sinks.Many<Boolean> = Sinks.many()
.multicast()
.directBestEffort()

init {
if (id.length < 3 || !id.matches(Regex("[a-zA-Z][a-zA-Z0-9_-]+[a-zA-Z0-9]"))) {
Expand Down Expand Up @@ -106,6 +109,10 @@ abstract class DefaultUpstream(
return statusStream.asFlux().distinctUntilChanged()
}

override fun observeState(): Flux<Boolean> {
return stateStream.asFlux()
}

override fun setLag(lag: Long) {
lag.coerceAtLeast(0).let { nLag ->
status.updateAndGet { curr ->
Expand Down
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/EmptyHead.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ class EmptyHead : Head {

override fun onSyncingNode(isSyncing: Boolean) {
}

override fun headLiveness(): Flux<Boolean> = Flux.empty()
}
2 changes: 2 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Head.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ interface Head {
fun stop()

fun onSyncingNode(isSyncing: Boolean)

fun headLiveness(): Flux<Boolean>
}
12 changes: 10 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ abstract class Multistream(
).distinct()
}

override fun observeState(): Flux<Boolean> {
return Flux.empty()
}

override fun isAvailable(): Boolean {
return getAll().any { it.isAvailable() }
}
Expand Down Expand Up @@ -315,10 +319,14 @@ abstract class Multistream(
.distinctUntilChanged {
it.getId()
}.flatMap { upstream ->
upstream.observeStatus().map { upstream }
val statusStream = upstream.observeStatus().map { upstream }
val stateStream = upstream.observeState().map { upstream }
Flux.merge(stateStream, statusStream)
.takeUntilOther(
subscribeRemovedUpstreams()
.filter { it.getId() == upstream.getId() },
.filter {
it.getId() == upstream.getId()
},
)
}
.subscribe {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ interface Upstream : Lifecycle {
fun isAvailable(): Boolean
fun getStatus(): UpstreamAvailability
fun observeStatus(): Flux<UpstreamAvailability>
fun observeState(): Flux<Boolean>
fun getHead(): Head

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,6 @@ class EnrichedMergedHead constructor(
}

override fun onSyncingNode(isSyncing: Boolean) {}

override fun headLiveness(): Flux<Boolean> = Flux.empty()
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ object EthereumChainSpecific : ChainSpecific {

override fun latestBlockRequest() = JsonRpcRequest("eth_getBlockByNumber", listOf("latest", false))
override fun listenNewHeadsRequest(): JsonRpcRequest = JsonRpcRequest("eth_subscribe", listOf("newHeads"))
override fun unsubscribeNewHeadsRequest(subId: String): JsonRpcRequest =
JsonRpcRequest("eth_unsubscribe", listOf(subId))

override fun localReaderBuilder(
cachingReader: CachingReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import io.emeraldpay.dshackle.reader.JsonRpcReader
import io.emeraldpay.dshackle.upstream.BlockValidator
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.Lifecycle
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import io.emeraldpay.dshackle.upstream.forkchoice.ForkChoice
import io.emeraldpay.dshackle.upstream.generic.ChainSpecific
import io.emeraldpay.dshackle.upstream.generic.GenericHead
Expand All @@ -32,6 +31,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Scheduler
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

class GenericWsHead(
forkChoice: ForkChoice,
Expand All @@ -40,7 +40,7 @@ class GenericWsHead(
private val wsSubscriptions: WsSubscriptions,
private val wsConnectionResubscribeScheduler: Scheduler,
headScheduler: Scheduler,
private val upstream: DefaultUpstream,
upstream: DefaultUpstream,
private val chainSpecific: ChainSpecific,
) : GenericHead(upstream.getId(), forkChoice, blockValidator, headScheduler, chainSpecific), Lifecycle {

Expand All @@ -51,6 +51,9 @@ class GenericWsHead(

private var subscription: Disposable? = null
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()
private val headLivenessSink = Sinks.many().multicast().directBestEffort<Boolean>()

private var subscriptionId = AtomicReference("")

init {
registerHeadResubscribeFlux()
Expand Down Expand Up @@ -85,18 +88,14 @@ class GenericWsHead(

fun listenNewHeads(): Flux<BlockContainer> {
return subscribe()
.transform {
Flux.concat(it.next().doOnNext { upstream.setStatus(UpstreamAvailability.OK) }, it)
}
.map {
chainSpecific.parseHeader(it, "unknown")
}
.timeout(Duration.ofSeconds(60), Mono.error(RuntimeException("No response from subscribe to newHeads")))
.onErrorResume {
log.error("Error getting heads for $upstreamId - ${it.message}")
upstream.setStatus(UpstreamAvailability.UNAVAILABLE)
subscribed = false
Mono.empty()
unsubscribe()
}
}

Expand All @@ -106,13 +105,27 @@ class GenericWsHead(
noHeadUpdatesSink.tryEmitComplete()
}

override fun headLiveness(): Flux<Boolean> = headLivenessSink.asFlux()

private fun unsubscribe(): Mono<BlockContainer> {
return wsSubscriptions.unsubscribe(chainSpecific.unsubscribeNewHeadsRequest(subscriptionId.get()).copy(id = ids.getAndIncrement()))
.flatMap { it.requireResult() }
.doOnNext { log.warn("{} has just unsubscribed from newHeads", upstreamId) }
.onErrorResume {
log.error("{} couldn't unsubscribe from newHeads", upstreamId, it)
Mono.empty()
}
.then(Mono.empty())
}

private val ids = AtomicInteger(1)

private fun subscribe(): Flux<ByteArray> {
return try {
wsSubscriptions.subscribe(chainSpecific.listenNewHeadsRequest().copy(id = ids.getAndIncrement()))
.also {
connectionId = it.connectionId
subscriptionId = it.subId
if (!connected) {
connected = true
}
Expand All @@ -126,6 +139,7 @@ class GenericWsHead(
val connectionStates = wsSubscriptions.connectionInfoFlux()
.map {
if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
headLivenessSink.emitNext(false) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
subscribed = false
connected = false
connectionId = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ class HeadLivenessValidator(
}

fun getFlux(): Flux<Boolean> {
val headLiveness = head.headLiveness()
// first we have moving window of 2 blocks and check that they are consecutive ones
return head.getFlux().map { it.height }.buffer(2, 1).map {
val headFlux = head.getFlux().map { it.height }.buffer(2, 1).map {
it.last() - it.first() == 1L
}.scan(Pair(0, true)) { acc, value ->
// then we accumulate consecutive true events, false resets counter
Expand Down Expand Up @@ -52,5 +53,7 @@ class HeadLivenessValidator(
}
},
).repeat().subscribeOn(scheduler)

return Flux.merge(headFlux, headLiveness)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ open class WsConnectionFactory(
)
}

open fun createWsConnection(connIndex: Int = 0, onDisconnect: () -> Unit): WsConnection =
WsConnectionImpl(uri, origin, basicAuth, metrics(connIndex), onDisconnect, scheduler).also { ws ->
open fun createWsConnection(connIndex: Int = 0): WsConnection =
WsConnectionImpl(uri, origin, basicAuth, metrics(connIndex), scheduler).also { ws ->
config?.frameSize?.let {
ws.frameSize = it
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ open class WsConnectionImpl(
private val origin: URI,
private val basicAuth: AuthConfig.ClientBasicAuth?,
private val rpcMetrics: RpcMetrics?,
private val onDisconnect: () -> Unit,
private val scheduler: Scheduler,
) : AutoCloseable, WsConnection, Cloneable {

Expand Down Expand Up @@ -198,7 +197,6 @@ open class WsConnectionImpl(
connection = HttpClient.create()
.resolver(DefaultAddressResolverGroup.INSTANCE)
.doOnDisconnected {
onDisconnect()
disconnects.tryEmitNext(Instant.now())
log.info("Disconnected from $uri")
if (keepConnection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import org.springframework.util.backoff.BackOffExecution
import org.springframework.util.backoff.ExponentialBackOff
import reactor.core.Disposable
Expand All @@ -39,7 +37,6 @@ import kotlin.concurrent.write
*/
class WsConnectionMultiPool(
private val wsConnectionFactory: WsConnectionFactory,
private val upstream: DefaultUpstream,
private val connections: Int,
) : WsConnectionPool {

Expand Down Expand Up @@ -110,16 +107,14 @@ class WsConnectionMultiPool(
SCHEDULE_FULL
} else {
current.add(
wsConnectionFactory.createWsConnection(connIndex++) {
if (isUnavailable()) {
upstream.setStatus(UpstreamAvailability.UNAVAILABLE)
}
}.also {
it.connect()
connectionSubscriptionMap[it.connectionId()] = it.connectionInfoFlux().subscribe { info ->
connectionInfo.emitNext(info) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
}
},
wsConnectionFactory.createWsConnection(connIndex++)
.also {
it.connect()
connectionSubscriptionMap[it.connectionId()] = it.connectionInfoFlux()
.subscribe { info ->
connectionInfo.emitNext(info) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
}
},
)
SCHEDULE_GROW
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ class WsConnectionPoolFactory(
"Creating instance for different upstream. ${upstream.getId()} != id"
}
return if (connections > 1) {
WsConnectionMultiPool(wsConnectionFactory, upstream, connections)
WsConnectionMultiPool(wsConnectionFactory, connections)
} else {
WsConnectionSinglePool(wsConnectionFactory, upstream)
WsConnectionSinglePool(wsConnectionFactory)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
*/
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.UpstreamAvailability
import reactor.core.publisher.Flux

class WsConnectionSinglePool(
wsConnectionFactory: WsConnectionFactory,
private val upstream: DefaultUpstream,
) : WsConnectionPool {
private val connection = wsConnectionFactory.createWsConnection {
upstream.setStatus(UpstreamAvailability.UNAVAILABLE)
}
private val connection = wsConnectionFactory.createWsConnection()

override fun connect() {
if (!connection.isConnected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package io.emeraldpay.dshackle.upstream.ethereum

import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.util.concurrent.atomic.AtomicReference

/**
* A JSON-RPC Subscription client.
Expand All @@ -42,8 +45,11 @@ interface WsSubscriptions {

fun connectionInfoFlux(): Flux<WsConnection.ConnectionInfo>

fun unsubscribe(request: JsonRpcRequest): Mono<JsonRpcResponse>

data class SubscribeData(
val data: Flux<ByteArray>,
val connectionId: String,
val subId: AtomicReference<String>,
)
}
Loading

0 comments on commit e4c13ad

Please sign in to comment.