Skip to content

Commit

Permalink
problem: doesn't connect to Bitcoin networks over gRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
splix committed Sep 1, 2020
1 parent 06930b6 commit c3d3d94
Show file tree
Hide file tree
Showing 22 changed files with 899 additions and 218 deletions.
1 change: 1 addition & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/Defaults.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ class Defaults {
companion object {
val timeout: Duration = Duration.ofSeconds(60)
val timeoutInternal: Duration = timeout.dividedBy(4)
val retryConnection: Duration = Duration.ofSeconds(10)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class BlockContainer(
}

@JvmStatic
fun from(raw: ByteArray): BlockContainer {
fun fromEthereumJson(raw: ByteArray): BlockContainer {
val block = Global.objectMapper.readValue(raw, BlockJson::class.java)
return from(block, raw)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,16 @@
*/
package io.emeraldpay.dshackle.startup

import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.FileResolver
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.cache.CachesFactory
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.calls.ManagedCallMethods
import io.emeraldpay.dshackle.upstream.ethereum.EthereumRpcUpstream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstream
import io.emeraldpay.dshackle.upstream.ethereum.EthereumWsFactory
import io.emeraldpay.dshackle.upstream.grpc.GrpcUpstreams
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcHttpClient
Expand Down Expand Up @@ -146,7 +143,7 @@ open class ConfiguredUpstreams(
}

val methods = buildMethods(config, chain)
val upstream = BitcoinUpstream(config.id
val upstream = BitcoinRpcUpstream(config.id
?: "bitcoin-${seq.getAndIncrement()}", chain, directApi,
options, config.role,
QuorumForLabels.QuorumItem(1, config.labels),
Expand Down
32 changes: 32 additions & 0 deletions src/main/kotlin/io/emeraldpay/dshackle/startup/QuorumForLabels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,20 @@ class QuorumForLabels() {
return Collections.unmodifiableList(nodes)
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is QuorumForLabels) return false

if (nodes != other.nodes) return false

return true
}

override fun hashCode(): Int {
return nodes.hashCode()
}


/**
* Details for a single element (upstream, node or aggregation)
*/
Expand All @@ -67,6 +81,24 @@ class QuorumForLabels() {
return QuorumItem(0, UpstreamsConfig.Labels())
}
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is QuorumItem) return false

if (quorum != other.quorum) return false
if (labels != other.labels) return false

return true
}

override fun hashCode(): Int {
var result = quorum
result = 31 * result + labels.hashCode()
return result
}


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
*/
package io.emeraldpay.dshackle.upstream

import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.BlockchainType
import io.emeraldpay.dshackle.Global
import io.emeraldpay.dshackle.cache.CachesEnabled
import io.emeraldpay.dshackle.cache.CachesFactory
import io.emeraldpay.dshackle.quorum.QuorumReaderFactory
import io.emeraldpay.dshackle.startup.UpstreamChange
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinMultistream
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinUpstream
import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods
import io.emeraldpay.dshackle.upstream.calls.CallMethods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package io.emeraldpay.dshackle.upstream

import io.emeraldpay.api.proto.BlockchainOuterClass
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.calls.CallMethods
Expand Down Expand Up @@ -46,6 +47,14 @@ abstract class DefaultUpstream(
return getStatus() == UpstreamAvailability.OK
}

fun onStatus(value: BlockchainOuterClass.ChainStatus) {
val available = value.availability
val quorum = value.quorum
setStatus(
if (available != null) UpstreamAvailability.fromGrpc(available.number) else UpstreamAvailability.UNAVAILABLE
)
}

override fun getStatus(): UpstreamAvailability {
return status.get().status
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Copyright (c) 2020 EmeraldPay, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.emeraldpay.dshackle.upstream.bitcoin

import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.*
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.grpc.Chain
import org.slf4j.LoggerFactory
import org.springframework.context.Lifecycle
import reactor.core.Disposable

open class BitcoinRpcUpstream(
id: String,
chain: Chain,
private val directApi: Reader<JsonRpcRequest, JsonRpcResponse>,
options: UpstreamsConfig.Options,
role: UpstreamsConfig.UpstreamRole,
node: QuorumForLabels.QuorumItem,
callMethods: CallMethods
) : BitcoinUpstream(id, chain, options, role, callMethods, node), Lifecycle {

companion object {
private val log = LoggerFactory.getLogger(BitcoinRpcUpstream::class.java)
}

private val head: Head = createHead()
private var validatorSubscription: Disposable? = null

private fun createHead(): Head {
return BitcoinRpcHead(
directApi,
ExtractBlock()
)
}

override fun getHead(): Head {
return head
}

override fun getApi(): Reader<JsonRpcRequest, JsonRpcResponse> {
return directApi
}

override fun getLabels(): Collection<UpstreamsConfig.Labels> {
return listOf(UpstreamsConfig.Labels())
}

override fun <T : Upstream> cast(selfType: Class<T>): T {
if (!selfType.isAssignableFrom(this.javaClass)) {
throw ClassCastException("Cannot cast ${this.javaClass} to $selfType")
}
return this as T
}

override fun isRunning(): Boolean {
var runningAny = validatorSubscription != null
if (head is Lifecycle) {
runningAny = runningAny || head.isRunning
}
return runningAny
}

override fun start() {
log.info("Configured for ${chain.chainName}")
if (head is Lifecycle) {
if (!head.isRunning) {
head.start()
}
}

validatorSubscription?.dispose()

if (getOptions().disableValidation != null && getOptions().disableValidation!!) {
this.setLag(0)
this.setStatus(UpstreamAvailability.OK)
} else {
val validator = BitcoinUpstreamValidator(directApi, getOptions())
validatorSubscription = validator.start()
.subscribe(this::setStatus)
}
}

override fun stop() {
if (head is Lifecycle) {
head.stop()
}
validatorSubscription?.dispose()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,97 +15,30 @@
*/
package io.emeraldpay.dshackle.upstream.bitcoin

import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.config.UpstreamsConfig
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.startup.QuorumForLabels
import io.emeraldpay.dshackle.upstream.*
import io.emeraldpay.dshackle.upstream.DefaultUpstream
import io.emeraldpay.dshackle.upstream.calls.CallMethods
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.emeraldpay.dshackle.upstream.calls.DefaultBitcoinMethods
import io.emeraldpay.grpc.Chain
import org.slf4j.LoggerFactory
import org.springframework.context.Lifecycle
import reactor.core.Disposable
import reactor.core.publisher.Mono

open class BitcoinUpstream(
abstract class BitcoinUpstream(
id: String,
val chain: Chain,
private val directApi: Reader<JsonRpcRequest, JsonRpcResponse>,
options: UpstreamsConfig.Options,
role: UpstreamsConfig.UpstreamRole,
node: QuorumForLabels.QuorumItem,
callMethods: CallMethods
) : DefaultUpstream(id, options, role, callMethods, node), Lifecycle {
callMethods: CallMethods,
node: QuorumForLabels.QuorumItem
) : DefaultUpstream(id, options, role, callMethods, node) {

constructor(id: String,
chain: Chain,
options: UpstreamsConfig.Options,
role: UpstreamsConfig.UpstreamRole) : this(id, chain, options, role, DefaultBitcoinMethods(), QuorumForLabels.QuorumItem.empty())

companion object {
private val log = LoggerFactory.getLogger(BitcoinUpstream::class.java)
}

private val head: Head = createHead()
private var validatorSubscription: Disposable? = null

private fun createHead(): Head {
return BitcoinRpcHead(
directApi,
ExtractBlock()
)
}

override fun getHead(): Head {
return head
}

override fun getApi(): Reader<JsonRpcRequest, JsonRpcResponse> {
return directApi
}

override fun getLabels(): Collection<UpstreamsConfig.Labels> {
return listOf(UpstreamsConfig.Labels())
}

override fun <T : Upstream> cast(selfType: Class<T>): T {
if (!selfType.isAssignableFrom(this.javaClass)) {
throw ClassCastException("Cannot cast ${this.javaClass} to $selfType")
}
return this as T
}

override fun isRunning(): Boolean {
var runningAny = validatorSubscription != null
if (head is Lifecycle) {
runningAny = runningAny || head.isRunning
}
runningAny = runningAny
return runningAny
}

override fun start() {
log.info("Configured for ${chain.chainName}")
if (head is Lifecycle) {
if (!head.isRunning) {
head.start()
}
}

validatorSubscription?.dispose()

if (getOptions().disableValidation != null && getOptions().disableValidation!!) {
this.setLag(0)
this.setStatus(UpstreamAvailability.OK)
} else {
val validator = BitcoinUpstreamValidator(directApi, getOptions())
validatorSubscription = validator.start()
.subscribe(this::setStatus)
}
}

override fun stop() {
if (head is Lifecycle) {
head.stop()
}
validatorSubscription?.dispose()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@
*/
package io.emeraldpay.dshackle.upstream.ethereum

import com.fasterxml.jackson.databind.ObjectMapper
import io.emeraldpay.dshackle.Defaults
import io.emeraldpay.dshackle.data.BlockContainer
import io.emeraldpay.dshackle.reader.Reader
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
import io.infinitape.etherjar.hex.HexQuantity
import io.infinitape.etherjar.rpc.Commands
import org.slf4j.LoggerFactory
import org.springframework.context.Lifecycle
import org.springframework.scheduling.concurrent.CustomizableThreadFactory
Expand Down Expand Up @@ -71,7 +69,7 @@ class EthereumRpcHead(
.timeout(Defaults.timeout, Mono.error(Exception("Block data not received")))
}
.map {
BlockContainer.from(it.getResult())
BlockContainer.fromEthereumJson(it.getResult())
}
.onErrorContinue { err, _ ->
log.debug("RPC error ${err.message}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class EthereumWsFactory(
}
}
.flatMap(JsonRpcResponse::requireResult)
.map { BlockContainer.from(it) }
.map { BlockContainer.fromEthereumJson(it) }
}.repeatWhenEmpty { n ->
Repeat.times<Any>(5)
.exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(500))
Expand Down
Loading

0 comments on commit c3d3d94

Please sign in to comment.