Skip to content

Commit

Permalink
Fixes slot, directReader and wsHead bugs (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillPamPam authored Feb 1, 2024
1 parent 7fba7df commit ac5b443
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class BlockContainer @JvmOverloads constructor(

fun copyWithRating(nodeRating: Int): BlockContainer {
return BlockContainer(
height, hash, difficulty, timestamp, full, json, parsed, parentHash, transactions, nodeRating,
height, hash, difficulty, timestamp, full, json, parsed, parentHash, transactions, nodeRating, upstreamId, slot,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class CompoundReader<K, D> (
rdr.read(key)
.timeout(Defaults.timeoutInternal, Mono.empty())
.doOnError { t -> log.warn("Failed to read from $rdr", t) }
.onErrorResume { Mono.empty() }
}, 1,)
.next()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.EthereumCallSelector
import io.emeraldpay.dshackle.upstream.ethereum.json.BlockJson
import io.emeraldpay.dshackle.upstream.ethereum.json.TransactionJsonSnapshot
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcException
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.etherjar.domain.Address
import io.emeraldpay.etherjar.domain.BlockHash
Expand All @@ -31,6 +32,7 @@ import io.emeraldpay.etherjar.rpc.json.TransactionLogJson
import io.emeraldpay.etherjar.rpc.json.TransactionReceiptJson
import io.emeraldpay.etherjar.rpc.json.TransactionRefJson
import org.apache.commons.collections4.Factory
import org.apache.commons.lang3.exception.ExceptionUtils
import org.slf4j.LoggerFactory
import org.springframework.cloud.sleuth.Tracer
import reactor.core.publisher.Mono
Expand Down Expand Up @@ -81,7 +83,7 @@ class EthereumDirectReader(
override fun read(key: TransactionId): Mono<Result<TxContainer>> {
val request = JsonRpcRequest("eth_getTransactionByHash", listOf(key.toHex()))
return readWithQuorum(request) // retries were removed because we use NotNullQuorum which handle errors too
.timeout(Defaults.timeoutInternal, Mono.error(TimeoutException("Tx not read $key")))
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Tx not read $key")))
.flatMap { result ->
val tx = objectMapper.readValue(result.data, TransactionJsonSnapshot::class.java)
if (tx == null) {
Expand All @@ -96,6 +98,8 @@ class EthereumDirectReader(
if (tx.data.blockId != null) {
caches.cache(Caches.Tag.REQUESTED, tx.data)
}
}.onErrorResume {
Mono.error(JsonRpcException(request.id, ExceptionUtils.getRootCauseMessage(it)))
}
}
}
Expand Down Expand Up @@ -128,7 +132,7 @@ class EthereumDirectReader(
override fun read(key: TransactionId): Mono<Result<ByteArray>> {
val request = JsonRpcRequest("eth_getTransactionReceipt", listOf(key.toHex()))
return readWithQuorum(request)
.timeout(Defaults.timeoutInternal, Mono.error(TimeoutException("Receipt not read $key")))
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Receipt not read $key")))
.flatMap { result ->
val receipt = objectMapper.readValue(result.data, TransactionReceiptJson::class.java)
if (receipt == null) {
Expand All @@ -149,6 +153,8 @@ class EthereumDirectReader(
result,
)
}
}.onErrorResume {
Mono.error(JsonRpcException(request.id, ExceptionUtils.getRootCauseMessage(it)))
}
}
}
Expand Down Expand Up @@ -188,7 +194,7 @@ class EthereumDirectReader(
matcher: Selector.Matcher = Selector.empty,
): Mono<Result<BlockContainer>> {
return readWithQuorum(request, matcher)
.timeout(Defaults.timeoutInternal, Mono.error(TimeoutException("Block not read $id")))
.timeout(Duration.ofSeconds(5), Mono.error(TimeoutException("Block not read $id")))
.retryWhen(Retry.fixedDelay(3, Duration.ofMillis(200)))
.flatMap { result ->
val block = objectMapper.readValue(result.data, BlockJson::class.java) as BlockJson<TransactionRefJson>?
Expand All @@ -208,6 +214,8 @@ class EthereumDirectReader(
}
.doOnNext { block ->
caches.cache(Caches.Tag.REQUESTED, block.data)
}.onErrorResume {
Mono.error(JsonRpcException(request.id, ExceptionUtils.getRootCauseMessage(it)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,12 @@ class GenericWsHead(
private var isSyncing = false

private var subscription: Disposable? = null
private var headResubSubscription: Disposable? = null
private val noHeadUpdatesSink = Sinks.many().multicast().directBestEffort<Boolean>()
private val headLivenessSink = Sinks.many().multicast().directBestEffort<Boolean>()

private var subscriptionId = AtomicReference("")

init {
registerHeadResubscribeFlux()
}

override fun isRunning(): Boolean {
return subscription != null
}
Expand All @@ -73,6 +70,10 @@ class GenericWsHead(
listenNewHeads(),
)
this.subscription = super.follow(heads)

if (headResubSubscription == null) {
headResubSubscription = registerHeadResubscribeFlux()
}
}

override fun onNoHeadUpdates() {
Expand All @@ -86,7 +87,7 @@ class GenericWsHead(
this.isSyncing = isSyncing
}

fun listenNewHeads(): Flux<BlockContainer> {
private fun listenNewHeads(): Flux<BlockContainer> {
return subscribe()
.map {
chainSpecific.parseHeader(it, "unknown")
Expand All @@ -102,7 +103,8 @@ class GenericWsHead(
override fun stop() {
super.stop()
cancelSub()
noHeadUpdatesSink.tryEmitComplete()
headResubSubscription?.dispose()
headResubSubscription = null
}

override fun headLiveness(): Flux<Boolean> = headLivenessSink.asFlux()
Expand Down Expand Up @@ -135,7 +137,7 @@ class GenericWsHead(
}
}

private fun registerHeadResubscribeFlux() {
private fun registerHeadResubscribeFlux(): Disposable {
val connectionStates = wsSubscriptions.connectionInfoFlux()
.map {
if (it.connectionId == connectionId && it.connectionState == WsConnection.ConnectionState.DISCONNECTED) {
Expand All @@ -150,10 +152,10 @@ class GenericWsHead(
return@map false
}

Flux.merge(
return Flux.merge(
noHeadUpdatesSink.asFlux(),
connectionStates,
).subscribeOn(wsConnectionResubscribeScheduler)
).publishOn(wsConnectionResubscribeScheduler)
.filter { it && !subscribed && connected && !isSyncing }
.subscribe {
log.warn("Restart ws head, upstreamId: $upstreamId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.emeraldpay.dshackle.upstream.ethereum
import io.emeraldpay.dshackle.Chain
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.test.GenericUpstreamMock
import io.emeraldpay.dshackle.test.TestingCommons
import io.emeraldpay.dshackle.upstream.BlockValidator
Expand Down Expand Up @@ -59,24 +60,32 @@ class GenericWsHeadSpec extends Specification {
Global.objectMapper.writeValueAsBytes(it)
}

def apiMock = TestingCommons.api()
def reader = Mock(Reader) {
1 * it.read(new JsonRpcRequest("eth_getBlockByNumber", List.of("latest", false))) >> Mono.empty()
}

def ws = Mock(WsSubscriptions) {
1 * it.connectionInfoFlux() >> Flux.empty()
}

def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE)
1 * ws.subscribe(_) >> new WsSubscriptions.SubscribeData(
Flux.fromIterable([headBlock]), "id", new AtomicReference<String>("")
)

def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, reader, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE)

def res = BlockContainer.from(block)
when:
def act = head.listenNewHeads().blockFirst()
def act = head.getFlux()

then:
act == res

1 * ws.subscribe(_) >> new WsSubscriptions.SubscribeData(
Flux.fromIterable([headBlock]), "id", new AtomicReference<String>("")
)
StepVerifier.create(act)
.then {
head.start()
}
.expectNext(res)
.thenCancel()
.verify(Duration.ofSeconds(3))
}

def "Restart ethereum ws head"() {
Expand Down Expand Up @@ -328,7 +337,9 @@ class GenericWsHeadSpec extends Specification {
block.uncles = []
block.totalDifficulty = BigInteger.ONE

def apiMock = TestingCommons.api()
def reader = Mock(Reader) {
1 * it.read(new JsonRpcRequest("eth_getBlockByNumber", List.of("latest", false))) >> Mono.empty()
}
def subId = "subId"
def ws = Mock(WsSubscriptions) {
1 * it.connectionInfoFlux() >> Flux.empty()
Expand All @@ -339,15 +350,19 @@ class GenericWsHeadSpec extends Specification {
Mono.just(new JsonRpcResponse("".bytes, null))
}

def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, apiMock, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE)
def head = new GenericWsHead(new AlwaysForkChoice(), BlockValidator.ALWAYS_VALID, reader, ws, Schedulers.boundedElastic(), Schedulers.boundedElastic(), upstream, EthereumChainSpecific.INSTANCE)

when:
def act = head.listenNewHeads()
def act = head.getFlux()

then:
StepVerifier.create(act)
.expectComplete()
.verify(Duration.ofSeconds(1))
.then {
head.start()
}
.expectNoEvent(Duration.ofMillis(100))
.thenCancel()
.verify(Duration.ofSeconds(3))
}

def "If there is ws disconnect then head must emit false its liveness state"() {
Expand Down

0 comments on commit ac5b443

Please sign in to comment.