Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Begin process of implementing Cosmos RPC #1368

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 72 additions & 25 deletions packages/iov-cosmos/src/cosmosconnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,21 @@ import {
TransactionId,
TransactionQuery,
TransactionState,
TxReadCodec,
UnsignedTransaction,
} from "@iov/bcp";
import { Encoding, Uint53 } from "@iov/encoding";
import { DefaultValueProducer, ValueAndUpdates } from "@iov/stream";
import { concat, DefaultValueProducer, dropDuplicates, fromListPromise, ValueAndUpdates } from "@iov/stream";
import { Client as TendermintClient } from "@iov/tendermint-rpc";
import equal from "fast-deep-equal";
import { ReadonlyDate } from "readonly-date";
import { Stream } from "xstream";

import { CosmosBech32Prefix, pubkeyToAddress } from "./address";
import { Caip5 } from "./caip5";
import { cosmosCodec } from "./cosmoscodec";
import { decodeAmount, parseTxsResponse } from "./decode";
import { RestClient, TxsResponse } from "./restclient";
import { TxsResponse } from "./restclient";

const { fromBase64 } = Encoding;

Expand Down Expand Up @@ -71,17 +74,18 @@ function buildQueryString({

export class CosmosConnection implements BlockchainConnection {
public static async establish(url: string): Promise<CosmosConnection> {
const restClient = new RestClient(url);
const chainData = await this.initialize(restClient);
return new CosmosConnection(restClient, chainData);
const tmClient = await TendermintClient.connect(url);
const chainData = await this.initialize(tmClient);
return new CosmosConnection(tmClient, cosmosCodec, chainData);
}

private static async initialize(restClient: RestClient): Promise<ChainData> {
const { node_info } = await restClient.nodeInfo();
return { chainId: Caip5.encode(node_info.network) };
private static async initialize(tmClient: TendermintClient): Promise<ChainData> {
const { nodeInfo } = await tmClient.status();
return { chainId: Caip5.encode(nodeInfo.network) };
}

private readonly restClient: RestClient;
private readonly tmClient: TendermintClient;
private readonly codec: TxReadCodec;
private readonly chainData: ChainData;
private readonly primaryToken: Token;
private readonly supportedTokens: readonly Token[];
Expand All @@ -90,8 +94,9 @@ export class CosmosConnection implements BlockchainConnection {
return "cosmos";
}

private constructor(restClient: RestClient, chainData: ChainData) {
this.restClient = restClient;
private constructor(tmClient: TendermintClient, codec: TxReadCodec, chainData: ChainData) {
this.tmClient = tmClient;
this.codec = codec;
this.chainData = chainData;
this.primaryToken = {
fractionalDigits: 6,
Expand All @@ -110,7 +115,7 @@ export class CosmosConnection implements BlockchainConnection {
}

public async height(): Promise<number> {
const { block_meta } = await this.restClient.blocksLatest();
const { block_meta } = await this.tmClient.blocksLatest();
return block_meta.header.height;
}

Expand All @@ -124,7 +129,7 @@ export class CosmosConnection implements BlockchainConnection {

public async getAccount(query: AccountQuery): Promise<Account | undefined> {
const address = isPubkeyQuery(query) ? pubkeyToAddress(query.pubkey, this.prefix) : query.address;
const { result } = await this.restClient.authAccounts(address);
const { result } = await this.tmClient.authAccounts(address);
const account = result.value;
const supportedCoins = account.coins.filter(({ denom }) =>
this.supportedTokens.find(
Expand All @@ -143,13 +148,23 @@ export class CosmosConnection implements BlockchainConnection {
};
}

public watchAccount(_account: AccountQuery): Stream<Account | undefined> {
throw new Error("not implemented");
public watchAccount(query: AccountQuery): Stream<Account | undefined> {
const address = isPubkeyQuery(query)
? this.codec.identityToAddress({ chainId: this.chainId(), pubkey: query.pubkey })
: query.address;

return concat(
Stream.fromPromise(this.getAccount(query)),
this.tmClient
.subscribeTx(buildQueryString({ sentFromOrTo: address }))
.map(() => Stream.fromPromise(this.getAccount(query)))
.flatten(),
);
}

public async getNonce(query: AddressQuery | PubkeyQuery): Promise<Nonce> {
const address = isPubkeyQuery(query) ? pubkeyToAddress(query.pubkey, this.prefix) : query.address;
const { result } = await this.restClient.authAccounts(address);
const { result } = await this.tmClient.authAccounts(address);
const account = result.value;
return parseInt(account.sequence, 10) as Nonce;
}
Expand All @@ -164,7 +179,7 @@ export class CosmosConnection implements BlockchainConnection {
}

public async getBlockHeader(height: number): Promise<BlockHeader> {
const { block_meta } = await this.restClient.blocks(height);
const { block_meta } = await this.tmClient.blocks(height);
return {
id: block_meta.block_id.hash as BlockId,
height: block_meta.header.height,
Expand All @@ -181,7 +196,7 @@ export class CosmosConnection implements BlockchainConnection {
id: TransactionId,
): Promise<ConfirmedAndSignedTransaction<UnsignedTransaction> | FailedTransaction> {
try {
const response = await this.restClient.txsById(id);
const response = await this.tmClient.txsById(id);
const chainId = await this.chainId();
return this.parseAndPopulateTxResponse(response, chainId);
} catch (error) {
Expand All @@ -193,7 +208,7 @@ export class CosmosConnection implements BlockchainConnection {
}

public async postTx(tx: PostableBytes): Promise<PostTxResponse> {
const { txhash, raw_log } = await this.restClient.postTx(tx);
const { txhash, raw_log } = await this.tmClient.postTx(tx);
const transactionId = txhash as TransactionId;
const firstEvent: BlockInfo = { state: TransactionState.Pending };
let blockInfoInterval: NodeJS.Timeout;
Expand Down Expand Up @@ -236,20 +251,52 @@ export class CosmosConnection implements BlockchainConnection {
): Promise<readonly (ConfirmedTransaction<UnsignedTransaction> | FailedTransaction)[]> {
const queryString = buildQueryString(query);
const chainId = this.chainId();
const { txs: responses } = await this.restClient.txs(queryString);
const { txs: responses } = await this.tmClient.txs(queryString);
return Promise.all(responses.map(response => this.parseAndPopulateTxResponse(response, chainId)));
}

public listenTx(
_query: TransactionQuery,
query: TransactionQuery,
): Stream<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction> {
throw new Error("not implemented");
const chainId = this.chainId();
const rawQuery = buildQueryString(query);
return this.tmClient.subscribeTx(rawQuery).map((transaction):
| ConfirmedTransaction<UnsignedTransaction>
| FailedTransaction => {
const transactionId = Encoding.toHex(transaction.hash).toUpperCase() as TransactionId;

if (transaction.result.code === 0) {
return {
height: transaction.height,
confirmations: 1, // assuming block height is current height when listening to events
transactionId: transactionId,
log: transaction.result.log,
result: transaction.result.data,
...this.codec.parseBytes((transaction.tx as Uint8Array) as PostableBytes, chainId),
};
} else {
const failed: FailedTransaction = {
height: transaction.height,
transactionId: transactionId,
code: transaction.result.code,
message: transaction.result.log,
};
return failed;
}
});
}

public liveTx(
_query: TransactionQuery,
query: TransactionQuery,
): Stream<ConfirmedTransaction<UnsignedTransaction> | FailedTransaction> {
throw new Error("not implemented");
const pendingSearchResults = this.searchTx(query).then(results =>
results.map((tx): ConfirmedTransaction<UnsignedTransaction> | FailedTransaction => tx),
);
const historyStream = fromListPromise(pendingSearchResults);
const updatesStream = this.listenTx(query);
const combinedStream = concat(historyStream, updatesStream);
const deduplicatedStream = combinedStream.compose(dropDuplicates(ct => ct.transactionId));
return deduplicatedStream;
}

public async getFeeQuote(tx: UnsignedTransaction): Promise<Fee> {
Expand Down Expand Up @@ -278,7 +325,7 @@ export class CosmosConnection implements BlockchainConnection {
chainId: ChainId,
): Promise<ConfirmedAndSignedTransaction<UnsignedTransaction> | FailedTransaction> {
const sender = (response.tx.value as any).msg[0].value.from_address;
const accountForHeight = await this.restClient.authAccounts(sender, response.height);
const accountForHeight = await this.tmClient.authAccounts(sender, response.height);
const nonce = (parseInt(accountForHeight.result.value.sequence, 10) - 1) as Nonce;
return parseTxsResponse(chainId, parseInt(response.height, 10), nonce, response);
}
Expand Down