Skip to content
This repository has been archived by the owner on Jan 22, 2019. It is now read-only.

implement block-retention #47

Merged
merged 17 commits into from
Aug 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ before_script:
after_success:
- yarn run coverage
script:
- yarn run lint
- yarn run test
notifications:
slack:
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ cd gnarly
# install yarn if you haven't already
# $ npm i -g yarn

# install lerna
yarn global add lerna
# install workspace dependencies, which includes lerna
yarn install

# boostrap the packages within this project (install deps, linking, etc)
lerna bootstrap

# now this command should pass:
yarn run build-ts
```

Now you should be able to run the tests with
Expand Down
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"lerna": "2.11.0",
"lerna": "3.0.3",
"packages": [
"packages/*"
],
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 9 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@
"private": true,
"version": "0.0.4-alpha.1",
"scripts": {
"watch-ts": "lerna exec --parallel -- yarn run watch-ts",
"build-ts": "lerna exec --parallel -- yarn run build-ts",
"test": "lerna exec --parallel --stream --sort --scope @xlnt/gnarly-core -- yarn run test",
"watch-ts": "lerna run watch-ts --parallel",
"build-ts": "lerna run build-ts",
"test": "lerna run test --scope @xlnt/gnarly-core",
"lint": "lerna run lint --parallel",
"clean": "lerna clean --yes && lerna exec --parallel -- rm -r ./lib",
"coverage": "yarn run coverage:generate && yarn run coverage:submit",
"coverage:generate": "lerna exec --parallel --scope @xlnt/gnarly-core -- yarn run coverage",
"coverage:generate": "lerna run coverage --parallel --scope @xlnt/gnarly-core",
"coverage:submit": "lcov-result-merger 'packages/**/lcov.info' | coveralls",
"pkg": "lerna run pkg --scope=@xlnt/gnarly-bin",
"docker-build": "docker build -t shrugs/gnarly-test:demo .",
"docker-push": "docker push shrugs/gnarly-test:demo",
"deploy": "yarn run build-ts && yarn run pkg && yarn run docker-build && yarn run docker-push"
},
"devDependencies": {
"@types/chai-spies": "^0.0.1",
"@types/mocha": "^2.2.48",
"@types/node": "^9.4.0",
"@types/uuid": "^3.4.3",
"coveralls": "^3.0.2",
"lcov-result-merger": "^3.1.0",
"lerna": "^2.11.0",
"lerna": "^3.0.3",
"nodemon": "^1.14.12",
"ts-node": "^5.0.1",
"tslint": "^5.9.1"
"ts-node": "^7.0.1",
"tslint": "^5.11.0"
},
"workspaces": [
"packages/*"
Expand Down
15 changes: 6 additions & 9 deletions packages/gnarly-bin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
"ts-start": "ts-node --no-cache src/index.ts",
"start": "node lib/index.js",
"watch": "concurrently -k -p \"[{name}]\" -n \"TypeScript,Node\" -c \"cyan.bold,green.bold\" \"npm run watch-ts\" \"npm run watch-node\"",
"test": "nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace test/**/*.spec.ts",
"watch-test": " mocha --watch --watch-extensions ts -r ts-node/register test/**/*.spec.ts",
"test": "nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace 'test/**/*.spec.ts'",
"watch-test": " mocha --watch --watch-extensions ts -r ts-node/register 'test/**/*.spec.ts'",
"coverage": "nyc report --reporter=text-lcov > ./lcov.info",
"build-ts": "tsc",
"watch-ts": "tsc -w",
"tslint": "tslint --project .",
"lint": "tslint --project .",
"pkg": "pkg --targets node9-linux-x64,node9-macos-x64 --out-path ./pkg ."
},
"pkg": {
Expand All @@ -35,19 +35,16 @@
"license": "Apache-2.0",
"devDependencies": {
"@types/chai": "^4.1.4",
"@types/chai-spies": "^1.0.0",
"@types/node": "^10.5.7",
"@xlnt/typescript-typings": "^0.6.0",
"chai": "^4.1.2",
"chai-spies": "^1.0.0",
"mocha": "^5.2.0",
"nyc": "^12.0.2",
"pkg": "^4.3.1",
"rosie": "^2.0.1",
"source-map-support": "^0.5.6",
"ts-node": "^6.0.3",
"tslint": "^5.9.1",
"typescript": "^2.8.3"
"ts-node": "^7.0.1",
"tslint": "^5.11.0",
"typescript": "^3.0.1"
},
"publishConfig": {
"access": "public"
Expand Down
7 changes: 5 additions & 2 deletions packages/gnarly-bin/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ const main = async () => {
process.on('SIGINT', gracefulExit)
process.on('SIGTERM', gracefulExit)

await gnarly.reset(process.env.GNARLY_RESET === 'true')
await gnarly.shaka(process.env.LATEST_BLOCK_HASH)
const GNARLY_RESET = (process.env.GNARLY_RESET || 'false') === 'true'
const LATEST_BLOCK_HASH = process.env.LATEST_BLOCK_HASH || null

await gnarly.reset(GNARLY_RESET)
await gnarly.shaka(LATEST_BLOCK_HASH)
}

process.on('unhandledRejection', (error) => {
Expand Down
13 changes: 3 additions & 10 deletions packages/gnarly-bin/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
{
"extends": "../../tsconfig.json",
"extends": "../tsconfig.build.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "lib"
},
"include": [
"src/**/*"
],
"references": [
{ "path": "../../gnarly-core" },
{ "path": "../../gnarly-reducer-block-meta" },
{ "path": "../../gnarly-reducer-erc721" },
{ "path": "../../gnarly-reducer-events" }
]
"include": ["src"]
}
13 changes: 6 additions & 7 deletions packages/gnarly-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
"prepare": "npm run build",
"build": "npm run build-ts && npm run tslint",
"start": "node lib/index.js",
"test": "nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace test/**/*.spec.ts",
"watch-test": " mocha --watch --watch-extensions ts -r ts-node/register -r source-map-support/register --full-trace test/**/*.spec.ts",
"test": "TS_NODE_PROJECT=./tsconfig.test.json nyc --reporter=text mocha -r ts-node/register -r source-map-support/register --full-trace 'test/**/*.spec.ts'",
"watch-test": " TS_NODE_PROJECT=./tsconfig.test.json mocha --watch --watch-extensions ts -r ts-node/register -r source-map-support/register --full-trace 'test/**/*.spec.ts'",
"coverage": "nyc report --reporter=text-lcov > ./lcov.info",
"build-ts": "tsc",
"watch-ts": "tsc -w",
"tslint": "tslint --project ."
"lint": "tslint --project ."
},
"files": [
"lib"
Expand Down Expand Up @@ -52,7 +52,6 @@
"@types/lodash.identity": "^3.0.3",
"@types/lodash.isplainobject": "^4.0.3",
"@types/node": "^10.5.7",
"@xlnt/typescript-typings": "^0.6.0",
"bn-chai": "^1.0.1",
"chai": "^4.1.2",
"chai-spies": "^1.0.0",
Expand All @@ -61,9 +60,9 @@
"nyc": "^12.0.2",
"rosie": "^2.0.1",
"source-map-support": "^0.5.6",
"ts-node": "^6.0.3",
"tslint": "^5.9.1",
"typescript": "2.6.1"
"ts-node": "^7.0.1",
"tslint": "^5.11.0",
"typescript": "^3.0.1"
},
"publishConfig": {
"access": "public"
Expand Down
123 changes: 81 additions & 42 deletions packages/gnarly-core/src/Blockstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ const debugOnBlockAdd = makeDebug('gnarly-core:blockstream:onBlockAdd')
const debugOnBlockInvalidated = makeDebug('gnarly-core:blockstream:onBlockInvalidated')

import {
Block as BlockstreamBlock,
BlockAndLogStreamer,
} from 'ethereumjs-blockstream'
import 'isomorphic-fetch'
Expand All @@ -15,8 +14,6 @@ import uuid = require('uuid')
import { IJSONBlock } from './models/Block'
import { IJSONLog } from './models/Log'

import Ourbit from './ourbit'

import {
timeout,
toBN,
Expand All @@ -42,52 +39,68 @@ class BlockStream {
})

constructor (
private ourbit: Ourbit,
private onBlock: (block: BlockstreamBlock, syncing: boolean) => () => Promise<any>,
private interval: number = 5000,
private reducerKey: string,
private processTransaction: (txId: string, fn: () => Promise<void>, extra: object) => Promise<void> ,
private rollbackTransaction: (blockHash: string) => Promise<void>,
private onNewBlock: (block: IJSONBlock, syncing: boolean) => () => Promise < any > ,
private blockRetention: number = 100,
) {

this.streamer = new BlockAndLogStreamer(globalState.api.getBlockByHash, globalState.api.getLogs, {
blockRetention: this.blockRetention,
})
}

public start = async (fromBlockHash: string = null) => {
this.streamer = new BlockAndLogStreamer(globalState.api.getBlockByHash, globalState.api.getLogs, {
blockRetention: 100,
})
let localLatestBlock: IJSONBlock | null = null

this.onBlockAddedSubscriptionToken = this.streamer.subscribeToOnBlockAdded(this.onBlockAdd)
this.onBlockRemovedSubscriptionToken = this.streamer.subscribeToOnBlockRemoved(this.onBlockInvalidated)
// the primary purpose of this function to to extend the historical block reduction
// beyond the blockRetention limit provided to ethereumjs-blockstream
// because we might have stopped tracking blocks for longer than ~100 blocks and need to catch up

let startBlockNumber
if (fromBlockHash === null) {
// if no hash provided, we're starting from HEAD
startBlockNumber = toBN(
(await globalState.api.getLatestBlock()).number,
)
const remoteLatestBlock = await globalState.api.getLatestBlock()

if (fromBlockHash !== null) {
// ^ if fromBlockHash is provided, it takes priority
debug('Continuing from blockHash %s', fromBlockHash)

// so look up the latest block we know about
localLatestBlock = await globalState.api.getBlockByHash(fromBlockHash)

// need to load that block into the local chain so handlers trigger correctly
// when we defer to the ethereumjs-blockstream reconciliation algorithm where it fetches
// its own historical blocks
this.streamer.reconcileNewBlock(localLatestBlock)
} else {
// otherwise get the expected block's number
const startFromBlock = await globalState.api.getBlockByHash(fromBlockHash)
startBlockNumber = toBN(startFromBlock.number).add(toBN(1))
// ^ +1 because we already know about this block and we want the next
// we are starting from head
debug('Starting from HEAD')

// ask the remote for the latest "local" block
localLatestBlock = await globalState.api.getLatestBlock()
}

debug('Beginning from block %d', startBlockNumber)
const remoteLatestBlockNumber = toBN(remoteLatestBlock.number)
const localLatestBlockNumber = toBN(localLatestBlock.number)

// get the latest block
let latestBlockNumber = toBN(
(await globalState.api.getLatestBlock()).number,
)
// subscribe to changes in chain
this.onBlockAddedSubscriptionToken = this.streamer.subscribeToOnBlockAdded(this.onBlockAdd)
this.onBlockRemovedSubscriptionToken = this.streamer.subscribeToOnBlockRemoved(this.onBlockInvalidated)

debug('Local block number: %d. Remote block number: %d', localLatestBlockNumber, remoteLatestBlockNumber)

// if we're not at that block number, start pulling the blocks
// from before until we catch up, then track latest
if (latestBlockNumber.gt(startBlockNumber)) {
let syncUpToNumber = await this.latestRemoteNumberWithRetentionBuffer()
// if we're not at that block number, start pulling the blocks from history
// until we enter the block retention limit
// once we've gotten to the block retention limit, we need to defer to blockstream's chain
// reconciliation algorithm
if (localLatestBlockNumber.lt(syncUpToNumber)) {
debugFastForward(
'Starting from %d and continuing to %d',
startBlockNumber.toNumber(),
latestBlockNumber.toNumber(),
localLatestBlockNumber.toNumber(),
remoteLatestBlockNumber.toNumber(),
)
this.syncing = true
let i = startBlockNumber.clone()
while (i.lt(latestBlockNumber)) {
let i = localLatestBlockNumber.clone()
while (i.lt(syncUpToNumber)) {
// if we're at the top of the queue
// wait a bit and then add the thing
while (this.pendingTransactions.size >= MAX_QUEUE_LENGTH) {
Expand All @@ -101,14 +114,15 @@ class BlockStream {
const block = await globalState.api.getBlockByNumber(i)
debugFastForward(
'block %s (%s)',
block.number,
toBN(block.number).toString(),
block.hash,
)
i = i.add(toBN(1))
await this.streamer.reconcileNewBlock(block)

i = toBN(block.number).add(toBN(1))
// TODO: easy optimization, only check latest block on the last
// iteration
latestBlockNumber = toBN((await globalState.api.getLatestBlock()).number)
syncUpToNumber = await this.latestRemoteNumberWithRetentionBuffer()
}

this.syncing = false
Expand All @@ -128,35 +142,52 @@ class BlockStream {
debug('Done! Exiting...')
}

private onBlockAdd = async (block: BlockstreamBlock) => {
public initWithHistoricalBlocks = async (historicalBlocks: IJSONBlock[] = []): Promise<any> => {
// ^ if historicalBlocks provided, reconcile blocks
debug(
'Initializing history with last historical block %s',
toBN(historicalBlocks[historicalBlocks.length - 1].number),
)

for (const block of historicalBlocks) {
await this.streamer.reconcileNewBlock(block)
}
}

private onBlockAdd = async (block: IJSONBlock) => {
const pendingTransaction = async () => {
debugOnBlockAdd(
'block %s (%s)',
block.number,
block.hash,
)

return this.ourbit.processTransaction(
await this.processTransaction(
uuid.v4(),
this.onBlock(block, this.syncing),
this.onNewBlock(block, this.syncing),
{
blockHash: block.hash,
},
)

await globalState.store.saveHistoricalBlock(this.reducerKey, this.blockRetention, block)
}

this.pendingTransactions.add(pendingTransaction)
}

private onBlockInvalidated = (block: BlockstreamBlock) => {
private onBlockInvalidated = (block: IJSONBlock) => {
const pendingTransaction = async () => {
debugOnBlockInvalidated(
'block %s (%s)',
block.number,
block.hash,
)

return this.ourbit.rollbackTransaction(block.hash)
// when a block is invalidated, rollback the transaction
await this.rollbackTransaction(block.hash)
// and then delete the historical block
await globalState.store.deleteHistoricalBlock(this.reducerKey, block.hash)
}

this.pendingTransactions.add(pendingTransaction)
Expand All @@ -167,6 +198,14 @@ class BlockStream {
await this.streamer.reconcileNewBlock(await globalState.api.getLatestBlock())
})
}

private latestRemoteNumberWithRetentionBuffer = async () => {
return toBN(
(await globalState.api.getLatestBlock()).number,
).sub(toBN(this.blockRetention - 10))
// ^ manually import historical blocks until we're within 90 blocks of HEAD
// and then we can use blockstream's reconciliation algorithm
}
}

export default BlockStream
Loading