From cbff4a9092e8667307da2d19cdbbf3833e1d9ed5 Mon Sep 17 00:00:00 2001 From: Igor Artamonov Date: Mon, 9 Oct 2023 20:55:57 -0400 Subject: [PATCH] problem: doesn't fetch Bitcoin balance when provided by another Dshackle instance --- .../dshackle/rpc/TrackBitcoinAddress.kt | 2 +- .../dshackle/upstream/DefaultUpstream.kt | 6 ++ .../upstream/bitcoin/RemoteUnspentReader.kt | 29 ++++++-- .../bitcoin/RemoteUnspentReaderTest.kt | 74 +++++++++++++++++++ 4 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 src/test/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReaderTest.kt diff --git a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinAddress.kt b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinAddress.kt index 5c35d12bf..f7760e8f0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinAddress.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackBitcoinAddress.kt @@ -78,7 +78,7 @@ class TrackBitcoinAddress( multistreamHolder.observeChains().subscribe { chain -> multistreamHolder.getUpstream(chain)?.let { mup -> val available = mup.getAll().any { up -> - !up.isGrpc() && up.getCapabilities().contains(Capability.BALANCE) + up.getCapabilities().contains(Capability.BALANCE) } setBalanceAvailability(chain, available) } diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt index 1da303353..c3ec36eb0 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/DefaultUpstream.kt @@ -86,6 +86,12 @@ abstract class DefaultUpstream( throw IllegalArgumentException("Invalid upstream id: $id") } + if (options.disableValidation) { + // if we specifically told that this upstream should be _always valid_ start with this state, + // but note it could be updated later (ex. provided by gRPC upstream) + this.setStatus(UpstreamAvailability.OK) + } + forkWatch.register(this) .subscribeOn(Schedulers.boundedElastic()) .subscribe { diff --git a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt index 60d20209a..b8e6a05fb 100644 --- a/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt +++ b/src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReader.kt @@ -1,6 +1,10 @@ package io.emeraldpay.dshackle.upstream.bitcoin import io.emeraldpay.api.proto.BlockchainOuterClass.BalanceRequest +import io.emeraldpay.api.proto.Common +import io.emeraldpay.api.proto.Common.AnyAddress +import io.emeraldpay.api.proto.Common.SingleAddress +import io.emeraldpay.dshackle.SilentException import io.emeraldpay.dshackle.upstream.Capability import io.emeraldpay.dshackle.upstream.Selector import io.emeraldpay.dshackle.upstream.bitcoin.data.SimpleUnspent @@ -26,14 +30,15 @@ class RemoteUnspentReader( val apis = upstreams.getApiSource(selector) apis.request(1) return Mono.from(apis) - .map { up -> - up.cast(BitcoinGrpcUpstream::class.java).remote - } - .flatMapMany { - val request = BalanceRequest.newBuilder() - .build() - it.getBalance(request) - } + .map { up -> up.cast(BitcoinGrpcUpstream::class.java) } + .flatMap { readFromUpstream(it, key) } + } + + fun readFromUpstream(upstream: BitcoinGrpcUpstream, address: Address): Mono> { + val request = createRequest(address) + return upstream.remote.getBalance(request) + .switchIfEmpty(Mono.error(SilentException.DataUnavailable("Balance not provider"))) + .doOnError { t -> log.warn("Failed to get balance from remote", t) } .map { resp -> resp.utxoList.map { utxo -> SimpleUnspent( @@ -45,4 +50,12 @@ class RemoteUnspentReader( } .reduce(List::plus) } + + fun createRequest(address: Address): BalanceRequest { + return BalanceRequest.newBuilder() + .setAddress(AnyAddress.newBuilder().setAddressSingle(SingleAddress.newBuilder().setAddress(address.toString()))) + .setAsset(Common.Asset.newBuilder().setChainValue(upstreams.chain.id)) + .setIncludeUtxo(true) + .build() + } } diff --git a/src/test/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReaderTest.kt b/src/test/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReaderTest.kt new file mode 100644 index 000000000..cea966537 --- /dev/null +++ b/src/test/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/RemoteUnspentReaderTest.kt @@ -0,0 +1,74 @@ +package io.emeraldpay.dshackle.upstream.bitcoin + +import io.emeraldpay.api.Chain +import io.emeraldpay.api.proto.BlockchainOuterClass.AddressBalance +import io.emeraldpay.api.proto.BlockchainOuterClass.BalanceRequest +import io.emeraldpay.api.proto.BlockchainOuterClass.Utxo +import io.emeraldpay.api.proto.Common +import io.emeraldpay.dshackle.upstream.grpc.BitcoinGrpcUpstream +import io.kotest.core.spec.style.ShouldSpec +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.shouldBe +import io.mockk.every +import io.mockk.mockk +import org.bitcoinj.core.Address +import org.bitcoinj.params.MainNetParams +import reactor.core.publisher.Flux +import java.time.Duration + +class RemoteUnspentReaderTest : ShouldSpec({ + + should("Create a request") { + val ups = mockk() + every { ups.chain } returns Chain.BITCOIN + val reader = RemoteUnspentReader(ups) + + val act = reader.createRequest(Address.fromString(MainNetParams.get(), "38XnPvu9PmonFU9WouPXUjYbW91wa5MerL")) + + act.asset.chain.name shouldBe "CHAIN_BITCOIN" + act.includeUtxo shouldBe true + act.address.addrTypeCase shouldBe Common.AnyAddress.AddrTypeCase.ADDRESS_SINGLE + act.address.addressSingle.address shouldBe "38XnPvu9PmonFU9WouPXUjYbW91wa5MerL" + } + + should("Read from remote") { + val ups = mockk() + every { ups.chain } returns Chain.BITCOIN + val reader = RemoteUnspentReader(ups) + + val up = mockk() + every { up.remote } returns mockk() { + every { getBalance(any() as BalanceRequest) } returns Flux.fromIterable( + listOf( + AddressBalance.newBuilder() + .addAllUtxo( + listOf( + Utxo.newBuilder() + .setTxId("cb46a01a257194ecf7d6a1f7e1bee8ac4f0a6687ec13bb0bba8942377b64a6c4") + .setIndex(0) + .setBalance("102030") + .build(), + Utxo.newBuilder() + .setTxId("c742a3e4257194ecf7d6a1f7e1bee8ac4b066a51ec13bb0bba8942377b64a6c4") + .setIndex(1) + .setBalance("123") + .build() + ) + ) + .build() + ) + ) + } + + val act = reader.readFromUpstream(up, Address.fromString(MainNetParams.get(), "38XnPvu9PmonFU9WouPXUjYbW91wa5MerL")) + .block(Duration.ofSeconds(1)) + + act shouldHaveSize 2 + act[0].txid shouldBe "cb46a01a257194ecf7d6a1f7e1bee8ac4f0a6687ec13bb0bba8942377b64a6c4" + act[0].vout shouldBe 0 + act[0].value shouldBe 102030 + act[1].txid shouldBe "c742a3e4257194ecf7d6a1f7e1bee8ac4b066a51ec13bb0bba8942377b64a6c4" + act[1].vout shouldBe 1 + act[1].value shouldBe 123 + } +})