Skip to content
This repository has been archived by the owner on Jan 22, 2019. It is now read-only.

Commit

Permalink
Merge pull request #50 from XLNT/feat/subscription-abstraction
Browse files Browse the repository at this point in the history
feat: abstract new block subscriptions to api
  • Loading branch information
shrugs authored Aug 12, 2018
2 parents 9253fbc + f9d0f93 commit 22434b0
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 828 deletions.
1 change: 0 additions & 1 deletion packages/gnarly-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
"pg": "^7.4.1",
"sequelize": "^4.35.2",
"uuid": "^3.2.1",
"web3": "^1.0.0-beta.29",
"web3-eth-abi": "^1.0.0-beta.34",
"web3-utils": "^1.0.0-beta.34"
},
Expand Down
9 changes: 4 additions & 5 deletions packages/gnarly-core/src/Blockstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class BlockStream {

private onBlockAddedSubscriptionToken
private onBlockRemovedSubscriptionToken
private reconciling
private unsubscribeFromNewBlocks
/**
* Whether or not the blockstreamer is syncing blocks from the past or not
*/
Expand Down Expand Up @@ -118,7 +118,7 @@ class BlockStream {
}

public stop = async () => {
clearInterval(this.reconciling)
this.unsubscribeFromNewBlocks()
if (this.streamer) {
this.streamer.unsubscribeFromOnBlockAdded(this.onBlockAddedSubscriptionToken)
this.streamer.unsubscribeFromOnBlockRemoved(this.onBlockRemovedSubscriptionToken)
Expand Down Expand Up @@ -163,10 +163,9 @@ class BlockStream {
}

private beginTracking = () => {
// @TODO - replace this with a filter
this.reconciling = setInterval(async () => {
this.unsubscribeFromNewBlocks = globalState.api.subscribeToNewBlocks(async () => {
await this.streamer.reconcileNewBlock(await globalState.api.getLatestBlock())
}, this.interval)
})
}
}

Expand Down
20 changes: 14 additions & 6 deletions packages/gnarly-core/src/ingestion/IngestApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { IJSONLog } from '../models/Log'

import { IJSONInternalTransaction } from '../models/InternalTransaction'

export type DecomposeFn = () => void

export default interface IIngestApi {
/**
* gets a block by number
Expand All @@ -25,18 +27,18 @@ export default interface IIngestApi {
getBlockByHash: (hash: string) => Promise<IJSONBlock>

/**
* gets logs from filter options
* @returns IJSONLog[]
* get the latest block
* @returns IJSONBlock
* @throws
*/
getLogs: (filterOptions: FilterOptions) => Promise<IJSONLog[]>
getLatestBlock: () => Promise<IJSONBlock>

/**
* get the latest block
* @returns IJSONBlock
* gets logs from filter options
* @returns IJSONLog[]
* @throws
*/
getLatestBlock: () => Promise<IJSONBlock>
getLogs: (filterOptions: FilterOptions) => Promise<IJSONLog[]>

/**
* gets tx receipt
Expand All @@ -51,4 +53,10 @@ export default interface IIngestApi {
* @throws
*/
traceTransaction: (hash: string) => Promise<IJSONInternalTransaction[]>

/**
* subscribes to new atomic events (blocks, when using a blockchain)
* @returns decomposer fn
*/
subscribeToNewBlocks: (cb) => DecomposeFn
}
22 changes: 17 additions & 5 deletions packages/gnarly-core/src/ingestion/Web3Api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import {

export default class Web3Api implements IIngestApi {

private longpollBlocksInterval

private doFetch = cacheApiRequest(
(method: string, params: any[] = []) => pRetry(
async () => {
Expand Down Expand Up @@ -69,21 +71,31 @@ export default class Web3Api implements IIngestApi {
return this.doFetch('eth_getBlockByHash', [hash, true])
}

public getLogs = async (filterOptions: FilterOptions): Promise<IJSONLog[]> => {
debug('[getLogs] %j', filterOptions)
return this.doFetch('eth_getLogs', [filterOptions])
}

public getLatestBlock = async (): Promise<IJSONBlock> => {
debug('[getLatestBlock]')
return this.doFetch('eth_getBlockByNumber', ['latest', true])
}

public getLogs = async (filterOptions: FilterOptions): Promise<IJSONLog[]> => {
debug('[getLogs] %j', filterOptions)
return this.doFetch('eth_getLogs', [filterOptions])
}

public getTransactionReceipt = async (hash: string): Promise<IJSONExternalTransactionReceipt> => {
return this.doFetch('eth_getTransactionReceipt', [hash])
}

public traceTransaction = async (hash: string): Promise<IJSONInternalTransaction[]> => {
return (await this.doFetch('trace_replayTransaction', [hash, ['trace']])).trace
}

public subscribeToNewBlocks = (cb) => {
// @TODO - https://github.com/XLNT/gnarly/issues/30
// but web3 subscriptions have a lot of issues with them as well
// longpoll timer
this.longpollBlocksInterval = setInterval(cb, 5000)
return () => {
clearInterval(this.longpollBlocksInterval)
}
}
}
13 changes: 12 additions & 1 deletion packages/gnarly-core/test/mocks/MockIngestApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
FilterOptions,
} from 'ethereumjs-blockstream'

import IIngestApi from '../../src/ingestion/IngestApi'
import IIngestApi, { DecomposeFn } from '../../src/ingestion/IngestApi'
import { IJSONBlock } from '../../src/models/Block'
import { IJSONExternalTransactionReceipt } from '../../src/models/ExternalTransaction'
import { IJSONInternalTransaction } from '../../src/models/InternalTransaction'
Expand All @@ -16,6 +16,8 @@ import IJSONLogFactory from '../factories/IJSONLogFactory'

export default class MockIngestApi implements IIngestApi {

private cb

constructor (
private numLogs = 4,
private numInternalTxs = 4,
Expand Down Expand Up @@ -46,4 +48,13 @@ export default class MockIngestApi implements IIngestApi {
public traceTransaction = (hash: string): Promise<IJSONInternalTransaction[]> => {
return IJSONInternalTransactionFactory.buildList(this.numInternalTxs, { hash })
}

public subscribeToNewBlocks = (cb): DecomposeFn => {
this.cb = cb
return this.decompose
}

public decompose = () => {
//
}
}
Loading

0 comments on commit 22434b0

Please sign in to comment.