Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing 8 changed files with 54 additions and 147 deletions.
15 changes: 13 additions & 2 deletions src/main/kotlin/io/emeraldpay/dshackle/rpc/SubscribeNodeStatus.kt
Original file line number Diff line number Diff line change
@@ -67,7 +67,7 @@ class SubscribeNodeStatus(
)

// subscribe on head/status updates for just added upstreams
val multiStreamUpdates = Flux.merge(
val adds = Flux.merge(
multistreams.all()
.map { ms ->
ms.subscribeAddedUpstreams()
@@ -92,8 +92,19 @@ class SubscribeNodeStatus(
}
}
)
val updates = Flux.merge(
multistreams.all().map { ms ->
ms.subscribeUpdatedUpstreams().map {
NodeStatusResponse.newBuilder()
.setNodeId(it.getId())
.setDescription(buildDescription(ms, it))
.setStatus(buildStatus(it.getStatus(), it.getHead().getCurrentHeight()))
.build()
}
}
)

return Flux.merge(upstreamUpdates, multiStreamUpdates, removals)
return Flux.merge(upstreamUpdates, adds, removals, updates)
}

private fun subscribeUpstreamUpdates(
33 changes: 4 additions & 29 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt
Original file line number Diff line number Diff line change
@@ -54,7 +54,7 @@ abstract class DefaultUpstream(
private val status = AtomicReference(Status(defaultLag, defaultAvail, statusByLag(defaultLag, defaultAvail)))
private val statusStream = Sinks.many()
.multicast()
.directBestEffort<UpstreamChangeState>()
.directBestEffort<UpstreamAvailability>()

init {
if (id.length < 3 || !id.matches(Regex("[a-zA-Z][a-zA-Z0-9_-]+[a-zA-Z0-9]"))) {
@@ -67,14 +67,9 @@ abstract class DefaultUpstream(
}

fun onStatus(value: BlockchainOuterClass.ChainStatus) {
this.onStatus(value, false)
}

fun onStatus(value: BlockchainOuterClass.ChainStatus, stateChanged: Boolean = false) {
val available = value.availability
setStatus(
if (available != null) UpstreamAvailability.fromGrpc(available.number) else UpstreamAvailability.UNAVAILABLE,
stateChanged
)
}

@@ -83,15 +78,11 @@ abstract class DefaultUpstream(
}

open fun setStatus(avail: UpstreamAvailability) {
this.setStatus(avail, false)
}

open fun setStatus(avail: UpstreamAvailability, stateChanged: Boolean = false) {
status.updateAndGet { curr ->
Status(curr.lag, avail, statusByLag(curr.lag, avail))
}.also {
statusStream.emitNext(
UpstreamChangeState(it.status, stateChanged)
it.status
) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
log.trace("Status of upstream [$id] changed to [$it], requested change status to [$avail]")
}
@@ -116,18 +107,7 @@ abstract class DefaultUpstream(
}

override fun observeStatus(): Flux<UpstreamAvailability> {
return statusStream.asFlux()
.distinctUntilChanged(
{ it },
{ prev, current ->
if (current.stateChanged) {
false
} else {
prev.status == current.status
}
}
)
.map { it.status }
return statusStream.asFlux().distinctUntilChanged()
}

override fun setLag(lag: Long) {
@@ -136,7 +116,7 @@ abstract class DefaultUpstream(
Status(nLag, curr.avail, statusByLag(nLag, curr.avail))
}.also {
statusStream.emitNext(
UpstreamChangeState(it.status, false)
it.status
) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
log.trace("Status of upstream [$id] changed to [$it], requested change lag to [$lag]")
}
@@ -171,9 +151,4 @@ abstract class DefaultUpstream(
}

data class Status(val lag: Long?, val avail: UpstreamAvailability, val status: UpstreamAvailability)

private data class UpstreamChangeState(
val status: UpstreamAvailability,
val stateChanged: Boolean
)
}
47 changes: 22 additions & 25 deletions src/main/kotlin/io/emeraldpay/dshackle/upstream/Multistream.kt
Original file line number Diff line number Diff line change
@@ -38,7 +38,6 @@ import reactor.core.Disposable
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.Sinks
import reactor.util.function.Tuples
import java.time.Duration
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock
@@ -80,9 +79,9 @@ abstract class Multistream(
private val removedUpstreams = Sinks.many()
.multicast()
.directBestEffort<Upstream>()
private val stateStream = Sinks.many()
private val updateUpstreams = Sinks.many()
.multicast()
.directBestEffort<UpstreamChangeState>()
.directBestEffort<Upstream>()

init {
UpstreamAvailability.values().forEach { status ->
@@ -206,16 +205,22 @@ abstract class Multistream(
open fun onUpstreamsUpdated() {
reconfigLock.withLock {
val upstreams = getAll()
upstreams.map { it.getMethods() }.let {
upstreams.filter { it.isAvailable() }.map { it.getMethods() }.let {
// TODO made list of uniq instances, and then if only one, just use it directly
callMethods = AggregatedCallMethods(it)
}
capabilities = if (upstreams.isEmpty()) {
emptySet()
} else {
upstreams.map { up ->
upstreams.filter { it.isAvailable() }.map { up ->
up.getCapabilities()
}.reduce { acc, curr -> acc + curr }
}.let {
if (it.isNotEmpty()) {
it.reduce { acc, curr -> acc + curr }
} else {
emptySet()
}
}
}
lagObserver?.stop()
lagObserver = null
@@ -281,26 +286,18 @@ abstract class Multistream(
}

private fun observeUpstreamsStatuses() {
stateStream.asFlux()
.subscribe {
upstreams.filter { it.isAvailable() }.map { it.getMethods() }.let {
callMethods = AggregatedCallMethods(it)
}
}

subscribeAddedUpstreams()
.filter { !it.isGrpc() }
.distinctUntilChanged {
it.getId()
}.map {
Tuples.of(it.getId(), it.observeStatus())
}.flatMap { upstream ->
upstream.observeStatus().map { upstream }
.takeUntilOther(
subscribeRemovedUpstreams()
.filter { it.getId() == upstream.getId() }
)
}
.subscribe { pair ->
pair.t2.subscribe { status ->
stateStream.emitNext(
UpstreamChangeState(pair.t1, status)
) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
}
.subscribe {
onUpstreamChange(UpstreamChangeEvent(this.chain, it, UpstreamChangeEvent.ChangeType.UPDATED))
}
}

@@ -397,6 +394,7 @@ abstract class Multistream(
UpstreamChangeEvent.ChangeType.REVALIDATED -> {}
UpstreamChangeEvent.ChangeType.UPDATED -> {
onUpstreamsUpdated()
updateUpstreams.emitNext(event.upstream) { _, res -> res == Sinks.EmitResult.FAIL_NON_SERIALIZED }
}
UpstreamChangeEvent.ChangeType.ADDED -> {
if (!started) {
@@ -441,9 +439,8 @@ abstract class Multistream(

fun subscribeRemovedUpstreams(): Flux<Upstream> =
removedUpstreams.asFlux()

fun subscribeStateChanges(): Flux<UpstreamChangeState> =
stateStream.asFlux()
fun subscribeUpdatedUpstreams(): Flux<Upstream> =
updateUpstreams.asFlux()

abstract fun makeLagObserver(): HeadLagObserver

Original file line number Diff line number Diff line change
@@ -171,7 +171,7 @@ class BitcoinGrpcUpstream(
val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
capabilities = newCapabilities
}
conf.status?.let { status -> onStatus(status, upstreamStatusChanged) }
conf.status?.let { status -> onStatus(status) }
return buildInfoChanged || upstreamStatusChanged
}
}
Original file line number Diff line number Diff line change
@@ -156,7 +156,7 @@ open class EthereumGrpcUpstream(
val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
capabilities = newCapabilities
}
conf.status?.let { status -> onStatus(status, upstreamStatusChanged) }
conf.status?.let { status -> onStatus(status) }
val subsChanged = (conf.supportedSubscriptionsList != subscriptionTopics).also {
subscriptionTopics = conf.supportedSubscriptionsList
}
Original file line number Diff line number Diff line change
@@ -121,10 +121,10 @@ open class EthereumPosGrpcUpstream(
val upstreamStatusChanged = (upstreamStatus.update(conf) || (newCapabilities != capabilities)).also {
capabilities = newCapabilities
}
conf.status?.let { status -> onStatus(status, upstreamStatusChanged) }
val subsChanged = (conf.supportedSubscriptionsList != subscriptionTopics).also {
subscriptionTopics = conf.supportedSubscriptionsList
}
conf.status?.let { status -> onStatus(status) }
return buildInfoChanged || upstreamStatusChanged || subsChanged
}

Original file line number Diff line number Diff line change
@@ -260,45 +260,19 @@ class MultistreamSpec extends Specification {
ms.onUpstreamChange(
new UpstreamChangeEvent(Chain.ETHEREUM__MAINNET, up2, UpstreamChangeEvent.ChangeType.ADDED)
)
def states = ms.subscribeStateChanges()
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_UNAVAILABLE))
up2.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
then:
StepVerifier.create(states)
.then {
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_UNAVAILABLE))
up2.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
}
.expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.UNAVAILABLE))
.expectNext(new Multistream.UpstreamChangeState(up2.getId(), UpstreamAvailability.OK))
.expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.OK))
.then {
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3")
}
.then {
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_SYNCING))
}
.expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.SYNCING))
.then {
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2")
}
.then {
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
}
.expectNext(new Multistream.UpstreamChangeState(up1.getId(), UpstreamAvailability.OK))
.then {
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3")
}
.then {
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
}
.expectNextCount(0)
.then {
up2.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
}
.expectNextCount(0)
.thenCancel()
.verify(Duration.ofSeconds(3))

assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3")
when:
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_SYNCING))
then:
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2")
when:
up1.onStatus(status(BlockchainOuterClass.AvailabilityEnum.AVAIL_OK))
then:
assert ms.getMethods().supportedMethods == Set.of("eth_test1", "eth_test2", "eth_test3")
}

def "Filter older blocks on multistream head"() {
Original file line number Diff line number Diff line change
@@ -256,56 +256,6 @@ class EthereumGrpcUpstreamSpec extends Specification {
h.height == 650247
}

def "Send update status if methods were changed"() {
setup:
def chain = Chain.ETHEREUM__MAINNET
def client = mockServer.clientForServer(new BlockchainGrpc.BlockchainImplBase() {
@Override
void nativeCall(BlockchainOuterClass.NativeCallRequest request, StreamObserver<BlockchainOuterClass.NativeCallReplyItem> responseObserver) {
}

@Override
void subscribeHead(Common.Chain request, StreamObserver<BlockchainOuterClass.ChainHead> responseObserver) {
}
})
def upstream = new EthereumGrpcUpstream("test", hash, UpstreamsConfig.UpstreamRole.PRIMARY, chain, client, new JsonRpcGrpcClient(client, chain, metrics), null, ChainsConfig.ChainConfig.default(), Schedulers.boundedElastic())
upstream.setLag(0)
upstream.setStatus(UpstreamAvailability.OK)
when:
def statuses = upstream.observeStatus()
then:
StepVerifier.create(statuses)
.then {
upstream.update(
describe(["eth_getBlockByHash"]),
BlockchainOuterClass.BuildInfo.newBuilder()
.setVersion(buildInfo.version)
.build(),
)
}
.expectNext(UpstreamAvailability.OK)
.then {
upstream.update(
describe(["eth_getBlockByHash"]),
BlockchainOuterClass.BuildInfo.newBuilder()
.setVersion(buildInfo.version)
.build(),
)
}
.expectNextCount(0)
.then {
upstream.update(
describe(["eth_getBlockByHash", "eth_getBlockByHash1"]),
BlockchainOuterClass.BuildInfo.newBuilder()
.setVersion(buildInfo.version)
.build(),
)
}
.expectNext(UpstreamAvailability.OK)
.thenCancel()
.verify(Duration.ofSeconds(3))
}

private BlockchainOuterClass.DescribeChain describe(List<String> methods) {
return BlockchainOuterClass.DescribeChain.newBuilder()
.setStatus(BlockchainOuterClass.ChainStatus.newBuilder().setQuorum(1).setAvailabilityValue(UpstreamAvailability.OK.grpcId))

0 comments on commit 3b7a89f

Please sign in to comment.