diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt index 83f88559d..2f2d1c6cc 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectNewHeads.kt @@ -18,7 +18,6 @@ package io.emeraldpay.dshackle.upstream.ethereum.subscribe import io.emeraldpay.dshackle.upstream.Multistream import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.SubscriptionConnect -import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.NewHeadMessage import reactor.core.publisher.Flux import reactor.core.scheduler.Scheduler import java.time.Duration @@ -30,11 +29,11 @@ import java.util.concurrent.ConcurrentHashMap class ConnectNewHeads( private val upstream: Multistream, private val scheduler: Scheduler, -) : SubscriptionConnect { +) : SubscriptionConnect { - private val connected: MutableMap> = ConcurrentHashMap() + private val connected: MutableMap> = ConcurrentHashMap() - override fun connect(matcher: Selector.Matcher): Flux = + override fun connect(matcher: Selector.Matcher): Flux = connected.computeIfAbsent(matcher.describeInternal()) { key -> ProduceNewHeads(upstream.getHead(matcher)) .start() diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceNewHeads.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceNewHeads.kt index baf84d8ea..263a04f7b 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceNewHeads.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ProduceNewHeads.kt @@ -15,6 +15,7 @@ */ package io.emeraldpay.dshackle.upstream.ethereum.subscribe +import io.emeraldpay.dshackle.Global import io.emeraldpay.dshackle.upstream.Head import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.NewHeadMessage import io.emeraldpay.etherjar.hex.HexData @@ -29,31 +30,36 @@ class ProduceNewHeads( val head: Head, ) { - fun start(): Flux { + fun start(): Flux { return head.getFlux() .map { - val block = it.toBlock() - NewHeadMessage( - block.number, - block.hash, - block.parentHash, - block.timestamp, - block.difficulty, - block.gasLimit, - block.gasUsed, - block.logsBloom, - block.miner, - block.baseFeePerGas?.amount, - block.extraData ?: HexData.empty(), - block.mixHash, - block.nonce, - block.receiptsRoot, - block.sha3Uncles, - block.stateRoot, - block.transactionsRoot, - block.withdrawalsRoot, - it.upstreamId, - ) + if (it.full || it.json == null) { + val block = it.toBlock() + val msg = NewHeadMessage( + block.number, + block.hash, + block.parentHash, + block.timestamp, + block.difficulty, + block.gasLimit, + block.gasUsed, + block.logsBloom, + block.miner, + block.baseFeePerGas?.amount, + block.extraData ?: HexData.empty(), + block.mixHash, + block.nonce, + block.receiptsRoot, + block.sha3Uncles, + block.stateRoot, + block.transactionsRoot, + block.withdrawalsRoot, + it.upstreamId, + ) + Global.objectMapper.writeValueAsBytes(msg) + } else { + it.json + } } } }