From c88e820d44e991904736dd4d6e1b337614ffdf4c Mon Sep 17 00:00:00 2001 From: willclarktech Date: Wed, 15 Jan 2020 13:19:04 +0100 Subject: [PATCH] WIP: Begin process of implementing Cosmos RPC --- packages/iov-cosmos/src/cosmosconnection.ts | 97 +++++++++++++++------ 1 file changed, 72 insertions(+), 25 deletions(-) diff --git a/packages/iov-cosmos/src/cosmosconnection.ts b/packages/iov-cosmos/src/cosmosconnection.ts index 2cad6de13..8165bb850 100644 --- a/packages/iov-cosmos/src/cosmosconnection.ts +++ b/packages/iov-cosmos/src/cosmosconnection.ts @@ -26,18 +26,21 @@ import { TransactionId, TransactionQuery, TransactionState, + TxReadCodec, UnsignedTransaction, } from "@iov/bcp"; import { Encoding, Uint53 } from "@iov/encoding"; -import { DefaultValueProducer, ValueAndUpdates } from "@iov/stream"; +import { concat, DefaultValueProducer, dropDuplicates, fromListPromise, ValueAndUpdates } from "@iov/stream"; +import { Client as TendermintClient } from "@iov/tendermint-rpc"; import equal from "fast-deep-equal"; import { ReadonlyDate } from "readonly-date"; import { Stream } from "xstream"; import { CosmosBech32Prefix, pubkeyToAddress } from "./address"; import { Caip5 } from "./caip5"; +import { cosmosCodec } from "./cosmoscodec"; import { decodeAmount, parseTxsResponse } from "./decode"; -import { RestClient, TxsResponse } from "./restclient"; +import { TxsResponse } from "./restclient"; const { fromBase64 } = Encoding; @@ -71,17 +74,18 @@ function buildQueryString({ export class CosmosConnection implements BlockchainConnection { public static async establish(url: string): Promise { - const restClient = new RestClient(url); - const chainData = await this.initialize(restClient); - return new CosmosConnection(restClient, chainData); + const tmClient = await TendermintClient.connect(url); + const chainData = await this.initialize(tmClient); + return new CosmosConnection(tmClient, cosmosCodec, chainData); } - private static async initialize(restClient: RestClient): Promise { - const { node_info } = await restClient.nodeInfo(); - return { chainId: Caip5.encode(node_info.network) }; + private static async initialize(tmClient: TendermintClient): Promise { + const { nodeInfo } = await tmClient.status(); + return { chainId: Caip5.encode(nodeInfo.network) }; } - private readonly restClient: RestClient; + private readonly tmClient: TendermintClient; + private readonly codec: TxReadCodec; private readonly chainData: ChainData; private readonly primaryToken: Token; private readonly supportedTokens: readonly Token[]; @@ -90,8 +94,9 @@ export class CosmosConnection implements BlockchainConnection { return "cosmos"; } - private constructor(restClient: RestClient, chainData: ChainData) { - this.restClient = restClient; + private constructor(tmClient: TendermintClient, codec: TxReadCodec, chainData: ChainData) { + this.tmClient = tmClient; + this.codec = codec; this.chainData = chainData; this.primaryToken = { fractionalDigits: 6, @@ -110,7 +115,7 @@ export class CosmosConnection implements BlockchainConnection { } public async height(): Promise { - const { block_meta } = await this.restClient.blocksLatest(); + const { block_meta } = await this.tmClient.blocksLatest(); return block_meta.header.height; } @@ -124,7 +129,7 @@ export class CosmosConnection implements BlockchainConnection { public async getAccount(query: AccountQuery): Promise { const address = isPubkeyQuery(query) ? pubkeyToAddress(query.pubkey, this.prefix) : query.address; - const { result } = await this.restClient.authAccounts(address); + const { result } = await this.tmClient.authAccounts(address); const account = result.value; const supportedCoins = account.coins.filter(({ denom }) => this.supportedTokens.find( @@ -143,13 +148,23 @@ export class CosmosConnection implements BlockchainConnection { }; } - public watchAccount(_account: AccountQuery): Stream { - throw new Error("not implemented"); + public watchAccount(query: AccountQuery): Stream { + const address = isPubkeyQuery(query) + ? this.codec.identityToAddress({ chainId: this.chainId(), pubkey: query.pubkey }) + : query.address; + + return concat( + Stream.fromPromise(this.getAccount(query)), + this.tmClient + .subscribeTx(buildQueryString({ sentFromOrTo: address })) + .map(() => Stream.fromPromise(this.getAccount(query))) + .flatten(), + ); } public async getNonce(query: AddressQuery | PubkeyQuery): Promise { const address = isPubkeyQuery(query) ? pubkeyToAddress(query.pubkey, this.prefix) : query.address; - const { result } = await this.restClient.authAccounts(address); + const { result } = await this.tmClient.authAccounts(address); const account = result.value; return parseInt(account.sequence, 10) as Nonce; } @@ -164,7 +179,7 @@ export class CosmosConnection implements BlockchainConnection { } public async getBlockHeader(height: number): Promise { - const { block_meta } = await this.restClient.blocks(height); + const { block_meta } = await this.tmClient.blocks(height); return { id: block_meta.block_id.hash as BlockId, height: block_meta.header.height, @@ -181,7 +196,7 @@ export class CosmosConnection implements BlockchainConnection { id: TransactionId, ): Promise | FailedTransaction> { try { - const response = await this.restClient.txsById(id); + const response = await this.tmClient.txsById(id); const chainId = await this.chainId(); return this.parseAndPopulateTxResponse(response, chainId); } catch (error) { @@ -193,7 +208,7 @@ export class CosmosConnection implements BlockchainConnection { } public async postTx(tx: PostableBytes): Promise { - const { txhash, raw_log } = await this.restClient.postTx(tx); + const { txhash, raw_log } = await this.tmClient.postTx(tx); const transactionId = txhash as TransactionId; const firstEvent: BlockInfo = { state: TransactionState.Pending }; let blockInfoInterval: NodeJS.Timeout; @@ -236,20 +251,52 @@ export class CosmosConnection implements BlockchainConnection { ): Promise | FailedTransaction)[]> { const queryString = buildQueryString(query); const chainId = this.chainId(); - const { txs: responses } = await this.restClient.txs(queryString); + const { txs: responses } = await this.tmClient.txs(queryString); return Promise.all(responses.map(response => this.parseAndPopulateTxResponse(response, chainId))); } public listenTx( - _query: TransactionQuery, + query: TransactionQuery, ): Stream | FailedTransaction> { - throw new Error("not implemented"); + const chainId = this.chainId(); + const rawQuery = buildQueryString(query); + return this.tmClient.subscribeTx(rawQuery).map((transaction): + | ConfirmedTransaction + | FailedTransaction => { + const transactionId = Encoding.toHex(transaction.hash).toUpperCase() as TransactionId; + + if (transaction.result.code === 0) { + return { + height: transaction.height, + confirmations: 1, // assuming block height is current height when listening to events + transactionId: transactionId, + log: transaction.result.log, + result: transaction.result.data, + ...this.codec.parseBytes((transaction.tx as Uint8Array) as PostableBytes, chainId), + }; + } else { + const failed: FailedTransaction = { + height: transaction.height, + transactionId: transactionId, + code: transaction.result.code, + message: transaction.result.log, + }; + return failed; + } + }); } public liveTx( - _query: TransactionQuery, + query: TransactionQuery, ): Stream | FailedTransaction> { - throw new Error("not implemented"); + const pendingSearchResults = this.searchTx(query).then(results => + results.map((tx): ConfirmedTransaction | FailedTransaction => tx), + ); + const historyStream = fromListPromise(pendingSearchResults); + const updatesStream = this.listenTx(query); + const combinedStream = concat(historyStream, updatesStream); + const deduplicatedStream = combinedStream.compose(dropDuplicates(ct => ct.transactionId)); + return deduplicatedStream; } public async getFeeQuote(tx: UnsignedTransaction): Promise { @@ -278,7 +325,7 @@ export class CosmosConnection implements BlockchainConnection { chainId: ChainId, ): Promise | FailedTransaction> { const sender = (response.tx.value as any).msg[0].value.from_address; - const accountForHeight = await this.restClient.authAccounts(sender, response.height); + const accountForHeight = await this.tmClient.authAccounts(sender, response.height); const nonce = (parseInt(accountForHeight.result.value.sequence, 10) - 1) as Nonce; return parseTxsResponse(chainId, parseInt(response.height, 10), nonce, response); }