-
Notifications
You must be signed in to change notification settings - Fork 2
/
blockchain-processor.js
397 lines (335 loc) · 14.1 KB
/
blockchain-processor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
/*!
* tracker/blockchain-processor.js
* Copyright © 2019 – Katana Cryptographic Ltd. All Rights Reserved.
*/
import zmq from 'zeromq/v5-compat.js'
import { Sema } from 'async-sema'
import util from '../lib/util.js'
import Logger from '../lib/logger.js'
import db from '../lib/db/mysql-db-wrapper.js'
import network from '../lib/bitcoin/network.js'
import { createRpcClient, waitForBitcoindRpcApi } from '../lib/bitcoind-rpc/rpc-client.js'
import keysFile from '../keys/index.js'
import Block from './block.js'
import * as blocksProcessor from './blocks-processor.js'
const keys = keysFile[network.key]
/**
* A class allowing to process the blockchain
*/
class BlockchainProcessor {
/**
* Constructor
* @param {object} notifSock - ZMQ socket used for notifications
*/
constructor(notifSock) {
// RPC client
this.client = createRpcClient()
// ZeroMQ socket for bitcoind blocks messages
this.blkSock = null
// Initialize a semaphor protecting the onBlockHash() method
this._onBlockHashSemaphor = new Sema(1, { capacity: 50 })
// Array of worker threads used for parallel processing of blocks
this.blockWorkers = []
// Flag tracking Initial Block Download Mode
this.isIBD = true
// Initialize the blocks processor
blocksProcessor.init(notifSock)
}
/**
* Start processing the blockchain
* @returns {Promise<void>}
*/
async start() {
await this.catchup()
await this.initSockets()
}
/**
* Start processing the blockchain
*/
async stop() {}
/**
* Tracker process startup
* @returns {Promise<void>}
*/
async catchup() {
try {
await waitForBitcoindRpcApi()
const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
const daemonNbHeaders = info.headers
// Consider that we are in IBD mode if Dojo is far in the past (> 13,000 blocks)
this.isIBD = (highest.blockHeight < 681000) || (highest.blockHeight < daemonNbHeaders - 13000)
return this.isIBD ? this.catchupIBDMode() : this.catchupNormalMode()
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.catchup()')
await util.delay(2000)
return this.catchup()
}
}
/**
* Tracker process startup (normal mode)
* 1. Grab the latest block height from the daemon
* 2. Pull all block headers after database last known height
* 3. Process those block headers
*
* @returns {Promise<void>}
*/
async catchupIBDMode() {
try {
Logger.info('Tracker : Tracker Startup (IBD mode)')
// Get highest block processed by the tracker
const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
const daemonNbBlocks = info.blocks
const daemonNbHeaders = info.headers
const dbMaxHeight = highest.blockHeight
let previousBlockId = highest.blockID
// If no header or block loaded by bitcoind => try later
if (daemonNbHeaders === 0 || daemonNbBlocks === 0) {
Logger.info('Tracker : New attempt scheduled in 30s (waiting for block headers)')
return util.delay(30000).then(() => {
return this.catchupIBDMode()
})
// If we have more blocks to load in db
} else if (daemonNbHeaders - 1 > dbMaxHeight) {
// If blocks need to be downloaded by bitcoind => try later
if (daemonNbBlocks - 1 <= dbMaxHeight) {
Logger.info('Tracker : New attempt scheduled in 10s (waiting for blocks)')
return util.delay(10000).then(() => {
return this.catchupIBDMode()
})
// If some blocks are ready for an import in db
} else {
const blockRange = util.range(dbMaxHeight + 1, daemonNbBlocks + 1)
Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
await util.seriesCall(blockRange, async height => {
try {
const blockHash = await this.client.getblockhash({ height })
const header = await this.client.getblockheader({ blockhash: blockHash, verbose: true })
// eslint-disable-next-line require-atomic-updates
previousBlockId = await this.processBlockHeader(header, previousBlockId)
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()')
process.exit()
}
}, 'Tracker syncing', true)
// Schedule a new iteration (in case more blocks need to be loaded)
Logger.info('Tracker : Start a new iteration')
return this.catchupIBDMode()
}
// If we are synced
} else {
this.isIBD = false
}
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.catchupIBDMode()')
throw error
}
}
/**
* Tracker process startup (normal mode)
* 1. Grab the latest block height from the daemon
* 2. Pull all block headers after database last known height
* 3. Process those block headers
*
* @returns {Promise<void>}
*/
async catchupNormalMode() {
try {
Logger.info('Tracker : Tracker Startup (normal mode)')
// Get highest block processed by the tracker
const [highest, info] = await Promise.all([db.getHighestBlock(), this.client.getblockchaininfo()])
const daemonNbBlocks = info.blocks
if (daemonNbBlocks === highest.blockHeight) return null
const blockRange = util.range(highest.blockHeight, daemonNbBlocks + 1)
Logger.info(`Tracker : Sync ${blockRange.length} blocks`)
try {
return this.processBlockRange(blockRange)
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.catchupNormalMode()')
process.exit()
}
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.catchupNormalMode()')
}
}
/**
* Initialiaze ZMQ sockets
*/
initSockets() {
// Socket listening to bitcoind Blocks messages
this.blkSock = zmq.socket('sub')
this.blkSock.connect(keys.bitcoind.zmqBlk)
this.blkSock.subscribe('hashblock')
this.blkSock.on('message', (topic, message) => {
switch (topic.toString()) {
case 'hashblock':
this.onBlockHash(message)
break
default:
Logger.info(`Tracker : ${topic.toString()}`)
}
})
Logger.info('Tracker : Listening for blocks')
}
/**
* Upon receipt of a new block hash, retrieve the block header from bitcoind via
* RPC. Continue pulling block headers back through the chain until the database
* contains header.previousblockhash, adding the headers to a stack. If the
* previousblockhash is not found on the first call, this is either a chain
* re-org or the tracker missed blocks during a shutdown.
*
* Once the chain has bottomed out with a known block in the database, delete
* all known database transactions confirmed in blocks at heights greater than
* the last known block height. These transactions are orphaned but may reappear
* in the new chain. Notify relevant accounts of balance updates /
* transaction confirmation counts.
*
* Delete block entries not on the main chain.
*
* Forward-scan through the block headers, pulling the full raw block hex via
* RPC. The raw block contains all transactions and is parsed by bitcoinjs-lib.
* Add the block to the database. Run checkTransaction for each transaction in
* the block that is not in the database. Confirm all transactions in the block.
*
* After each block, query bitcoin against all database unconfirmed outputs
* to see if they remain in the mempool or have been confirmed in blocks.
* Malleated transactions entering the wallet will disappear from the mempool on
* block confirmation.
*
* @param {Buffer} buf - block
* @returns {Promise<void>}
*/
async onBlockHash(buf) {
try {
// Acquire the semaphor
await this._onBlockHashSemaphor.acquire()
const blockHash = buf.toString('hex')
let headers = null
try {
const header = await this.client.getblockheader({ blockhash: blockHash, verbose: true })
Logger.info(`Tracker : Block #${header.height} ${blockHash}`)
// Grab all headers between this block and last known
headers = await this.chainBacktrace([header])
} catch (error) {
Logger.error(error, `Tracker : BlockchainProcessor.onBlockHash() : error in getblockheader(${blockHash})`)
}
if (headers == null)
return null
// Reverse headers to put oldest first
headers.reverse()
const deepest = headers[0]
const knownHeight = deepest.height - 1
// Cancel confirmation of transactions
// and delete blocks after the last known block height
await this.rewind(knownHeight)
// Process the blocks
return await this.processBlocks(headers)
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.onBlockHash()')
} finally {
// Release the semaphor
await this._onBlockHashSemaphor.release()
}
}
/**
* Zip back up the blockchain until a known prevHash is found, returning all
* block headers from last header in the array to the block after last known.
* @param {object[]} headers - array of block headers
* @returns {Promise}
*/
async chainBacktrace(headers) {
// Block deepest in the blockchain is the last on the list
const deepest = headers[headers.length - 1]
if (headers.length > 1)
Logger.info(`Tracker : chainBacktrace @ height ${deepest.height}, ${headers.length} blocks`)
// Look for previous block in the database
const block = await db.getBlockByHash(deepest.previousblockhash)
if (block == null) {
// Previous block does not exist in database. Grab from bitcoind
const header = await this.client.getblockheader({ blockhash: deepest.previousblockhash, verbose: true })
headers.push(header)
return this.chainBacktrace(headers)
} else {
// Previous block does exist. Return headers
return headers
}
}
/**
* Cancel confirmation of transactions
* and delete blocks after a given height
* @param {number} height - height of last block maintained
* @returns {Promise<void>}
*/
async rewind(height) {
// Retrieve transactions confirmed in reorg'd blocks
const txs = await db.getTransactionsConfirmedAfterHeight(height)
if (txs.length > 0) {
// Cancel confirmation of transactions included in reorg'd blocks
Logger.info(`Tracker : Backtrace: unconfirm ${txs.length} transactions in reorg`)
const txids = txs.map(t => t.txnTxid)
await db.unconfirmTransactions(txids)
}
await db.deleteBlocksAfterHeight(height)
}
/**
* Rescan a range of blocks
* @param {number} fromHeight - height of first block
* @param {number} toHeight - height of last block
* @returns {Promise}
*/
async rescanBlocks(fromHeight, toHeight) {
// Get highest block processed by the tracker
const highest = await db.getHighestBlock()
const dbMaxHeight = highest.blockHeight
if (toHeight == null)
toHeight = fromHeight
toHeight = Math.min(toHeight, dbMaxHeight)
const blockRange = util.range(fromHeight, toHeight + 1)
Logger.info(`Blocks Rescan : starting a rescan for ${blockRange.length} blocks`)
try {
return this.processBlockRange(blockRange)
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.rescan()')
throw error
}
}
/**
* Process a list of blocks
* @param {object[]} headers - array of block headers
*/
async processBlocks(headers) {
const chunks = util.splitList(headers, blocksProcessor.nbWorkers)
await util.seriesCall(chunks, async chunk => {
return blocksProcessor.processChunk(chunk)
})
}
/**
* Process a range of blocks
* @param {number[]} heights - a range of block heights
*/
async processBlockRange(heights) {
const chunks = util.splitList(heights, blocksProcessor.nbWorkers)
return util.seriesCall(chunks, async chunk => {
const headers = await util.parallelCall(chunk, async height => {
const hash = await this.client.getblockhash({ height })
return await this.client.getblockheader({ blockhash: hash })
})
return this.processBlocks(headers)
})
}
/**
* Process a block header
* @param {object} header - block header
* @param {number} prevBlockID - id of previous block
* @returns {Promise}
*/
async processBlockHeader(header, prevBlockID) {
try {
const block = new Block(null, header)
return block.checkBlockHeader(prevBlockID)
} catch (error) {
Logger.error(error, 'Tracker : BlockchainProcessor.processBlockHeader()')
throw error
}
}
}
export default BlockchainProcessor