Skip to content

Commit

Permalink
send original json of ws subscription heads (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
a10zn8 authored Feb 2, 2024
1 parent 46bb122 commit 5e94e52
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package io.emeraldpay.dshackle.upstream.ethereum.subscribe
import io.emeraldpay.dshackle.upstream.Multistream
import io.emeraldpay.dshackle.upstream.Selector
import io.emeraldpay.dshackle.upstream.SubscriptionConnect
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.NewHeadMessage
import reactor.core.publisher.Flux
import reactor.core.scheduler.Scheduler
import java.time.Duration
Expand All @@ -30,11 +29,11 @@ import java.util.concurrent.ConcurrentHashMap
class ConnectNewHeads(
private val upstream: Multistream,
private val scheduler: Scheduler,
) : SubscriptionConnect<NewHeadMessage> {
) : SubscriptionConnect<ByteArray> {

private val connected: MutableMap<String, Flux<NewHeadMessage>> = ConcurrentHashMap()
private val connected: MutableMap<String, Flux<ByteArray>> = ConcurrentHashMap()

override fun connect(matcher: Selector.Matcher): Flux<NewHeadMessage> =
override fun connect(matcher: Selector.Matcher): Flux<ByteArray> =
connected.computeIfAbsent(matcher.describeInternal()) { key ->
ProduceNewHeads(upstream.getHead(matcher))
.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.emeraldpay.dshackle.upstream.ethereum.subscribe

import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.upstream.Head
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.NewHeadMessage
import io.emeraldpay.etherjar.hex.HexData
Expand All @@ -29,31 +30,36 @@ class ProduceNewHeads(
val head: Head,
) {

fun start(): Flux<NewHeadMessage> {
fun start(): Flux<ByteArray> {
return head.getFlux()
.map {
val block = it.toBlock()
NewHeadMessage(
block.number,
block.hash,
block.parentHash,
block.timestamp,
block.difficulty,
block.gasLimit,
block.gasUsed,
block.logsBloom,
block.miner,
block.baseFeePerGas?.amount,
block.extraData ?: HexData.empty(),
block.mixHash,
block.nonce,
block.receiptsRoot,
block.sha3Uncles,
block.stateRoot,
block.transactionsRoot,
block.withdrawalsRoot,
it.upstreamId,
)
if (it.full || it.json == null) {
val block = it.toBlock()
val msg = NewHeadMessage(
block.number,
block.hash,
block.parentHash,
block.timestamp,
block.difficulty,
block.gasLimit,
block.gasUsed,
block.logsBloom,
block.miner,
block.baseFeePerGas?.amount,
block.extraData ?: HexData.empty(),
block.mixHash,
block.nonce,
block.receiptsRoot,
block.sha3Uncles,
block.stateRoot,
block.transactionsRoot,
block.withdrawalsRoot,
it.upstreamId,
)
Global.objectMapper.writeValueAsBytes(msg)
} else {
it.json
}
}
}
}

0 comments on commit 5e94e52

Please sign in to comment.