Skip to content
This repository has been archived by the owner on Jan 22, 2019. It is now read-only.

Commit

Permalink
Merge pull request #47 from nionis/feat/block-retention
Browse files Browse the repository at this point in the history
implement block-retention
  • Loading branch information
shrugs authored Aug 12, 2018
2 parents 22434b0 + 16e8a44 commit d66431d
Show file tree
Hide file tree
Showing 44 changed files with 1,872 additions and 706 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ before_script:
after_success:
- yarn run coverage
script:
- yarn run lint
- yarn run test
notifications:
slack:
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ cd gnarly
# install yarn if you haven't already
# $ npm i -g yarn

# install lerna
yarn global add lerna
# install workspace dependencies, which includes lerna
yarn install

# boostrap the packages within this project (install deps, linking, etc)
lerna bootstrap

# now this command should pass:
yarn run build-ts
```

Now you should be able to run the tests with
Expand Down
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"lerna": "2.11.0",
"lerna": "3.0.3",
"packages": [
"packages/*"
],
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@
"private": true,
"version": "0.0.4-alpha.1",
"scripts": {
"watch-ts": "lerna exec --parallel -- yarn run watch-ts",
"build-ts": "lerna exec --parallel -- yarn run build-ts",
"test": "lerna exec --parallel --stream --sort --scope @xlnt/gnarly-core -- yarn run test",
"watch-ts": "lerna run watch-ts --parallel",
"build-ts": "lerna run build-ts",
"test": "lerna run test --scope @xlnt/gnarly-core",
"lint": "lerna run lint --parallel",
"clean": "lerna clean --yes && lerna exec --parallel -- rm -r ./lib",
"coverage": "yarn run coverage:generate && yarn run coverage:submit",
"coverage:generate": "lerna exec --parallel --scope @xlnt/gnarly-core -- yarn run coverage",
"coverage:generate": "lerna run coverage --parallel --scope @xlnt/gnarly-core",
"coverage:submit": "lcov-result-merger 'packages/**/lcov.info' | coveralls",
"pkg": "lerna run pkg --scope=@xlnt/gnarly-bin",
"docker-build": "docker build -t shrugs/gnarly-test:demo .",
"docker-push": "docker push shrugs/gnarly-test:demo",
"deploy": "yarn run build-ts && yarn run pkg && yarn run docker-build && yarn run docker-push"
},
"devDependencies": {
"@types/chai-spies": "^0.0.1",
"@types/mocha": "^2.2.48",
"@types/node": "^9.4.0",
"@types/uuid": "^3.4.3",
"coveralls": "^3.0.2",
"lcov-result-merger": "^3.1.0",
"lerna": "^2.11.0",
"lerna": "^3.0.3",
"nodemon": "^1.14.12",
"ts-node": "^5.0.1",
"tslint": "^5.9.1"
"ts-node": "^7.0.1",
"tslint": "^5.11.0"
},
"workspaces": [
"packages/*"
Expand Down
15 changes: 6 additions & 9 deletions packages/gnarly-bin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
"ts-start": "ts-node --no-cache src/index.ts",
"start": "node lib/index.js",
"watch": "concurrently -k -p \"[{name}]\" -n \"TypeScript,Node\" -c \"cyan.bold,green.bold\" \"npm run watch-ts\" \"npm run watch-node\"",
"test": "nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace test/**/*.spec.ts",
"watch-test": " mocha --watch --watch-extensions ts -r ts-node/register test/**/*.spec.ts",
"test": "nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace 'test/**/*.spec.ts'",
"watch-test": " mocha --watch --watch-extensions ts -r ts-node/register 'test/**/*.spec.ts'",
"coverage": "nyc report --reporter=text-lcov > ./lcov.info",
"build-ts": "tsc",
"watch-ts": "tsc -w",
"tslint": "tslint --project .",
"lint": "tslint --project .",
"pkg": "pkg --targets node9-linux-x64,node9-macos-x64 --out-path ./pkg ."
},
"pkg": {
Expand All @@ -35,19 +35,16 @@
"license": "Apache-2.0",
"devDependencies": {
"@types/chai": "^4.1.4",
"@types/chai-spies": "^1.0.0",
"@types/node": "^10.5.7",
"@xlnt/typescript-typings": "^0.6.0",
"chai": "^4.1.2",
"chai-spies": "^1.0.0",
"mocha": "^5.2.0",
"nyc": "^12.0.2",
"pkg": "^4.3.1",
"rosie": "^2.0.1",
"source-map-support": "^0.5.6",
"ts-node": "^6.0.3",
"tslint": "^5.9.1",
"typescript": "^2.8.3"
"ts-node": "^7.0.1",
"tslint": "^5.11.0",
"typescript": "^3.0.1"
},
"publishConfig": {
"access": "public"
Expand Down
7 changes: 5 additions & 2 deletions packages/gnarly-bin/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ const main = async () => {
process.on('SIGINT', gracefulExit)
process.on('SIGTERM', gracefulExit)

await gnarly.reset(process.env.GNARLY_RESET === 'true')
await gnarly.shaka(process.env.LATEST_BLOCK_HASH)
const GNARLY_RESET = (process.env.GNARLY_RESET || 'false') === 'true'
const LATEST_BLOCK_HASH = process.env.LATEST_BLOCK_HASH || null

await gnarly.reset(GNARLY_RESET)
await gnarly.shaka(LATEST_BLOCK_HASH)
}

process.on('unhandledRejection', (error) => {
Expand Down
13 changes: 3 additions & 10 deletions packages/gnarly-bin/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
{
"extends": "../../tsconfig.json",
"extends": "../tsconfig.build.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "lib"
},
"include": [
"src/**/*"
],
"references": [
{ "path": "../../gnarly-core" },
{ "path": "../../gnarly-reducer-block-meta" },
{ "path": "../../gnarly-reducer-erc721" },
{ "path": "../../gnarly-reducer-events" }
]
"include": ["src"]
}
13 changes: 6 additions & 7 deletions packages/gnarly-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
"prepare": "npm run build",
"build": "npm run build-ts && npm run tslint",
"start": "node lib/index.js",
"test": "nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace test/**/*.spec.ts",
"watch-test": " mocha --watch --watch-extensions ts -r ts-node/register -r source-map-support/register --full-trace test/**/*.spec.ts",
"test": "TS_NODE_PROJECT=./tsconfig.test.json nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace 'test/**/*.spec.ts'",
"watch-test": " TS_NODE_PROJECT=./tsconfig.test.json mocha --watch --watch-extensions ts -r ts-node/register -r source-map-support/register --full-trace 'test/**/*.spec.ts'",
"coverage": "nyc report --reporter=text-lcov > ./lcov.info",
"build-ts": "tsc",
"watch-ts": "tsc -w",
"tslint": "tslint --project ."
"lint": "tslint --project ."
},
"files": [
"lib"
Expand Down Expand Up @@ -52,7 +52,6 @@
"@types/lodash.identity": "^3.0.3",
"@types/lodash.isplainobject": "^4.0.3",
"@types/node": "^10.5.7",
"@xlnt/typescript-typings": "^0.6.0",
"bn-chai": "^1.0.1",
"chai": "^4.1.2",
"chai-spies": "^1.0.0",
Expand All @@ -61,9 +60,9 @@
"nyc": "^12.0.2",
"rosie": "^2.0.1",
"source-map-support": "^0.5.6",
"ts-node": "^6.0.3",
"tslint": "^5.9.1",
"typescript": "2.6.1"
"ts-node": "^7.0.1",
"tslint": "^5.11.0",
"typescript": "^3.0.1"
},
"publishConfig": {
"access": "public"
Expand Down
123 changes: 81 additions & 42 deletions packages/gnarly-core/src/Blockstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const debugOnBlockAdd = makeDebug('gnarly-core:blockstream:onBlockAdd')
const debugOnBlockInvalidated = makeDebug('gnarly-core:blockstream:onBlockInvalidated')

import {
Block as BlockstreamBlock,
BlockAndLogStreamer,
} from 'ethereumjs-blockstream'
import 'isomorphic-fetch'
Expand All @@ -15,8 +14,6 @@ import uuid = require('uuid')
import { IJSONBlock } from './models/Block'
import { IJSONLog } from './models/Log'

import Ourbit from './ourbit'

import {
timeout,
toBN,
Expand All @@ -42,52 +39,68 @@ class BlockStream {
})

constructor (
private ourbit: Ourbit,
private onBlock: (block: BlockstreamBlock, syncing: boolean) => () => Promise<any>,
private interval: number = 5000,
private reducerKey: string,
private processTransaction: (txId: string, fn: () => Promise<void>, extra: object) => Promise<void> ,
private rollbackTransaction: (blockHash: string) => Promise<void>,
private onNewBlock: (block: IJSONBlock, syncing: boolean) => () => Promise < any > ,
private blockRetention: number = 100,
) {

this.streamer = new BlockAndLogStreamer(globalState.api.getBlockByHash, globalState.api.getLogs, {
blockRetention: this.blockRetention,
})
}

public start = async (fromBlockHash: string = null) => {
this.streamer = new BlockAndLogStreamer(globalState.api.getBlockByHash, globalState.api.getLogs, {
blockRetention: 100,
})
let localLatestBlock: IJSONBlock | null = null

this.onBlockAddedSubscriptionToken = this.streamer.subscribeToOnBlockAdded(this.onBlockAdd)
this.onBlockRemovedSubscriptionToken = this.streamer.subscribeToOnBlockRemoved(this.onBlockInvalidated)
// the primary purpose of this function to to extend the historical block reduction
// beyond the blockRetention limit provided to ethereumjs-blockstream
// because we might have stopped tracking blocks for longer than ~100 blocks and need to catch up

let startBlockNumber
if (fromBlockHash === null) {
// if no hash provided, we're starting from HEAD
startBlockNumber = toBN(
(await globalState.api.getLatestBlock()).number,
)
const remoteLatestBlock = await globalState.api.getLatestBlock()

if (fromBlockHash !== null) {
// ^ if fromBlockHash is provided, it takes priority
debug('Continuing from blockHash %s', fromBlockHash)

// so look up the latest block we know about
localLatestBlock = await globalState.api.getBlockByHash(fromBlockHash)

// need to load that block into the local chain so handlers trigger correctly
// when we defer to the ethereumjs-blockstream reconciliation algorithm where it fetches
// its own historical blocks
this.streamer.reconcileNewBlock(localLatestBlock)
} else {
// otherwise get the expected block's number
const startFromBlock = await globalState.api.getBlockByHash(fromBlockHash)
startBlockNumber = toBN(startFromBlock.number).add(toBN(1))
// ^ +1 because we already know about this block and we want the next
// we are starting from head
debug('Starting from HEAD')

// ask the remote for the latest "local" block
localLatestBlock = await globalState.api.getLatestBlock()
}

debug('Beginning from block %d', startBlockNumber)
const remoteLatestBlockNumber = toBN(remoteLatestBlock.number)
const localLatestBlockNumber = toBN(localLatestBlock.number)

// get the latest block
let latestBlockNumber = toBN(
(await globalState.api.getLatestBlock()).number,
)
// subscribe to changes in chain
this.onBlockAddedSubscriptionToken = this.streamer.subscribeToOnBlockAdded(this.onBlockAdd)
this.onBlockRemovedSubscriptionToken = this.streamer.subscribeToOnBlockRemoved(this.onBlockInvalidated)

debug('Local block number: %d. Remote block number: %d', localLatestBlockNumber, remoteLatestBlockNumber)

// if we're not at that block number, start pulling the blocks
// from before until we catch up, then track latest
if (latestBlockNumber.gt(startBlockNumber)) {
let syncUpToNumber = await this.latestRemoteNumberWithRetentionBuffer()
// if we're not at that block number, start pulling the blocks from history
// until we enter the block retention limit
// once we've gotten to the block retention limit, we need to defer to blockstream's chain
// reconciliation algorithm
if (localLatestBlockNumber.lt(syncUpToNumber)) {
debugFastForward(
'Starting from %d and continuing to %d',
startBlockNumber.toNumber(),
latestBlockNumber.toNumber(),
localLatestBlockNumber.toNumber(),
remoteLatestBlockNumber.toNumber(),
)
this.syncing = true
let i = startBlockNumber.clone()
while (i.lt(latestBlockNumber)) {
let i = localLatestBlockNumber.clone()
while (i.lt(syncUpToNumber)) {
// if we're at the top of the queue
// wait a bit and then add the thing
while (this.pendingTransactions.size >= MAX_QUEUE_LENGTH) {
Expand All @@ -101,14 +114,15 @@ class BlockStream {
const block = await globalState.api.getBlockByNumber(i)
debugFastForward(
'block %s (%s)',
block.number,
toBN(block.number).toString(),
block.hash,
)
i = i.add(toBN(1))
await this.streamer.reconcileNewBlock(block)

i = toBN(block.number).add(toBN(1))
// TODO: easy optimization, only check latest block on the last
// iteration
latestBlockNumber = toBN((await globalState.api.getLatestBlock()).number)
syncUpToNumber = await this.latestRemoteNumberWithRetentionBuffer()
}

this.syncing = false
Expand All @@ -128,35 +142,52 @@ class BlockStream {
debug('Done! Exiting...')
}

private onBlockAdd = async (block: BlockstreamBlock) => {
public initWithHistoricalBlocks = async (historicalBlocks: IJSONBlock[] = []): Promise<any> => {
// ^ if historicalBlocks provided, reconcile blocks
debug(
'Initializing history with last historical block %s',
toBN(historicalBlocks[historicalBlocks.length - 1].number),
)

for (const block of historicalBlocks) {
await this.streamer.reconcileNewBlock(block)
}
}

private onBlockAdd = async (block: IJSONBlock) => {
const pendingTransaction = async () => {
debugOnBlockAdd(
'block %s (%s)',
block.number,
block.hash,
)

return this.ourbit.processTransaction(
await this.processTransaction(
uuid.v4(),
this.onBlock(block, this.syncing),
this.onNewBlock(block, this.syncing),
{
blockHash: block.hash,
},
)

await globalState.store.saveHistoricalBlock(this.reducerKey, this.blockRetention, block)
}

this.pendingTransactions.add(pendingTransaction)
}

private onBlockInvalidated = (block: BlockstreamBlock) => {
private onBlockInvalidated = (block: IJSONBlock) => {
const pendingTransaction = async () => {
debugOnBlockInvalidated(
'block %s (%s)',
block.number,
block.hash,
)

return this.ourbit.rollbackTransaction(block.hash)
// when a block is invalidated, rollback the transaction
await this.rollbackTransaction(block.hash)
// and then delete the historical block
await globalState.store.deleteHistoricalBlock(this.reducerKey, block.hash)
}

this.pendingTransactions.add(pendingTransaction)
Expand All @@ -167,6 +198,14 @@ class BlockStream {
await this.streamer.reconcileNewBlock(await globalState.api.getLatestBlock())
})
}

private latestRemoteNumberWithRetentionBuffer = async () => {
return toBN(
(await globalState.api.getLatestBlock()).number,
).sub(toBN(this.blockRetention - 10))
// ^ manually import historical blocks until we're within 90 blocks of HEAD
// and then we can use blockstream's reconciliation algorithm
}
}

export default BlockStream
Loading

0 comments on commit d66431d

Please sign in to comment.