diff --git a/runner/package.json b/runner/package.json index 87b21b967..92ae713bd 100644 --- a/runner/package.json +++ b/runner/package.json @@ -63,6 +63,7 @@ "express": "^4.18.2", "graphql": "^16.8.1", "long": "^5.2.3", + "near-api-js": "^4.0.2", "node-fetch": "^2.6.11", "node-sql-parser": "^5.0.0", "pg": "^8.11.1", diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 344deae86..b537ceb23 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -13,6 +13,8 @@ import type IndexerConfig from '../indexer-config'; import { type PostgresConnectionParams } from '../pg-client'; import IndexerMeta, { IndexerStatus } from '../indexer-meta'; import { wrapSpan } from '../utility'; +import { type IRpcClient } from '../rpc-client/rpc-client'; +import { type CodeResult } from '@near-js/types/lib/provider/response'; interface Dependencies { fetch: typeof fetch @@ -20,7 +22,13 @@ interface Dependencies { dmlHandler?: DmlHandler indexerMeta?: IndexerMeta parser: Parser -}; + rpcClient?: IRpcClient +} + +interface IRpcContext { + viewCallRaw: (contractId: string, methodName: string, args: Record) => Promise + viewCallJSON: (contractId: string, methodName: string, args: Record) => Promise +} interface Context { graphql: (operation: string, variables?: Record) => Promise @@ -31,6 +39,7 @@ interface Context { error: (message: string) => void fetchFromSocialApi: (path: string, options?: any) => Promise db: Record any>> + rpc: IRpcContext } export interface TableDefinitionNames { @@ -200,7 +209,8 @@ export default class Indexer { fetchFromSocialApi: async (path, options) => { return await this.deps.fetch(`https://api.near.social${path}`, options); }, - db: this.buildDatabaseContext(blockHeight, logEntries) + db: this.buildDatabaseContext(blockHeight, logEntries), + rpc: this.buildRPCContext(blockHeight) }; } @@ -277,6 +287,25 @@ export default class Indexer { return pascalCaseTableName; } + buildRPCContext ( + currentBlockHeight: number, + ): IRpcContext { + const rpcClient = (): IRpcClient => { + if (!this.deps.rpcClient) { + throw new Error('RPC client is not configured'); + } + return this.deps.rpcClient; + }; + return { + viewCallRaw: async (contractId: string, methodName: string, args: Record = {}, blockHeight = currentBlockHeight): Promise => { + return await rpcClient().viewCallRaw(blockHeight, contractId, methodName, args); + }, + viewCallJSON: async (contractId: string, methodName: string, args: Record = {}, blockHeight = currentBlockHeight): Promise => { + return await rpcClient().viewCallJSON(blockHeight, contractId, methodName, args); + } + }; + } + buildDatabaseContext ( blockHeight: number, logEntries: LogEntry[], diff --git a/runner/src/rpc-client/rpc-client.test.ts b/runner/src/rpc-client/rpc-client.test.ts new file mode 100644 index 000000000..08b2d7514 --- /dev/null +++ b/runner/src/rpc-client/rpc-client.test.ts @@ -0,0 +1,25 @@ +import RpcClient from './rpc-client'; + +describe('RPCClient unit tests', () => { + const rpcClient = RpcClient.fromConfig({ + networkId: 'mainnet', + nodeUrl: 'https://beta.rpc.mainnet.near.org', + }); + const testBlockHeight = 121_031_955; + + it('Should make a get_total_staked_balance view call to pool.near', async () => { + const response = await rpcClient.viewCallJSON(testBlockHeight, 'epic.poolv1.near', 'get_total_staked_balance', {}); + console.log(response); + expect(response).toBeDefined(); + }); + + it('Should return non-empty dataplatform.near.list_by_account', async () => { + const response = await rpcClient.viewCallJSON(testBlockHeight, 'queryapi.dataplatform.near', 'list_by_account', { account_id: 'dataplatform.near' }); + expect(Object.keys(response).length).toBeGreaterThanOrEqual(0); + }, 30_000); + + it('Should get_contracts_metadata from sputnik-dao.near', async () => { + const response = await rpcClient.viewCallJSON(testBlockHeight, 'sputnik-dao.near', 'get_contracts_metadata', {}); + expect(response.length).toBeGreaterThanOrEqual(3); + }); +}); diff --git a/runner/src/rpc-client/rpc-client.ts b/runner/src/rpc-client/rpc-client.ts new file mode 100644 index 000000000..f8ca33392 --- /dev/null +++ b/runner/src/rpc-client/rpc-client.ts @@ -0,0 +1,53 @@ +import { type NearConfig } from '@near-js/wallet-account/lib/near'; +import { connect, type Near } from 'near-api-js'; +import { type CodeResult } from '@near-js/types/lib/provider/response'; + +type RpcViewCallArgs = Record; + +export interface IRpcClient { + viewCallRaw: (blockHeight: number, contractId: string, methodName: string, args: RpcViewCallArgs) => Promise + viewCallJSON: (blockHeight: number, contractId: string, methodName: string, args: RpcViewCallArgs) => Promise +} + +export default class RpcClient implements IRpcClient { + #near: Near | undefined; + + private constructor (private readonly config: NearConfig) {} + + async nearConnection (): Promise { + if (!this.#near) { + this.#near = await connect(this.config); + } + return this.#near; + } + + async viewCallRaw (blockHeight: number, contractId: string, methodName: string, args: RpcViewCallArgs = {}): Promise { + const near = await this.nearConnection(); + return await near.connection.provider.query({ + request_type: 'call_function', + blockId: blockHeight, + account_id: contractId, + method_name: methodName, + args_base64: Buffer.from(JSON.stringify(args)).toString('base64'), + }); + } + + async viewCallJSON (blockHeight: number, contractId: string, methodName: string, args: RpcViewCallArgs = {}): Promise { + const response: CodeResult = await this.viewCallRaw(blockHeight, contractId, methodName, args); + return JSON.parse(Buffer.from(response.result).toString('ascii')); + } + + static fromConfig (config: NearConfig): IRpcClient { + return new RpcClient(config); + } + + static fromEnv (): IRpcClient { + if (!process.env.RPC_URL) { + throw new Error('Missing RPC_URL env var for RpcClient'); + } + return RpcClient.fromConfig({ + networkId: 'mainnet', + nodeUrl: process.env.RPC_URL, + }); + } +} diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 6be274822..e36a0d340 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -13,6 +13,7 @@ import { IndexerStatus } from '../indexer-meta/indexer-meta'; import IndexerConfig from '../indexer-config'; import parentLogger from '../logger'; import { wrapSpan } from '../utility'; +import RpcClient from '../rpc-client/rpc-client'; if (isMainThread) { throw new Error('Worker should not be run on main thread'); @@ -96,7 +97,9 @@ async function blockQueueProducer (workerContext: WorkerContext): Promise async function blockQueueConsumer (workerContext: WorkerContext): Promise { let previousError: string = ''; const indexerConfig: IndexerConfig = workerContext.indexerConfig; - const indexer = new Indexer(indexerConfig); + const indexer = new Indexer(indexerConfig, { + rpcClient: RpcClient.fromEnv() + }); let streamMessageId = ''; let currBlockHeight = 0; diff --git a/runner/tests/integration.test.ts b/runner/tests/integration.test.ts index f7be5ef1b..c122927e2 100644 --- a/runner/tests/integration.test.ts +++ b/runner/tests/integration.test.ts @@ -13,6 +13,7 @@ import block_115185108 from './blocks/00115185108/streamer_message.json'; import block_115185109 from './blocks/00115185109/streamer_message.json'; import { LogLevel } from '../src/indexer-meta/log-entry'; import IndexerConfig from '../src/indexer-config'; +import RpcClient, { type IRpcClient } from '../src/rpc-client/rpc-client'; describe('Indexer integration', () => { jest.setTimeout(300_000); @@ -25,6 +26,7 @@ describe('Indexer integration', () => { let postgresContainer: StartedPostgreSqlContainer; let hasuraContainer: StartedHasuraGraphQLContainer; let graphqlClient: GraphQLClient; + let rpcClient: IRpcClient; beforeEach(async () => { hasuraClient = new HasuraClient({}, { @@ -56,6 +58,11 @@ describe('Indexer integration', () => { pgBouncerPort: Number(postgresContainer.getPort()), } ); + + rpcClient = RpcClient.fromConfig({ + networkId: 'mainnet', + nodeUrl: 'https://beta.rpc.mainnet.near.org', + }); }); beforeAll(async () => { @@ -117,7 +124,8 @@ describe('Indexer integration', () => { const indexer = new Indexer( indexerConfig, { - provisioner + provisioner, + rpcClient }, undefined, {