Skip to content
This repository has been archived by the owner on Jun 10, 2019. It is now read-only.

refactor(state): locks and concurrency #64

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"leveldown": "^4.0.1",
"levelup": "^3.1.1",
"lodash": "^4.17.11",
"multi-lock-queue": "~1.0.2",
"plasma-contracts": "git+https://github.com/plasma-group/plasma-contracts.git",
"plasma-explorer": "git+https://github.com/plasma-group/plasma-explorer.git",
"plasma-utils": "git+https://github.com/plasma-group/plasma-utils.git",
Expand Down
119 changes: 43 additions & 76 deletions src/state-manager/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const log = require('debug')('info:state')
const models = require('plasma-utils').serialization.models
const UnsignedTransaction = models.UnsignedTransaction
const SignedTransaction = models.SignedTransaction
const { createLockingQueue } = require('multi-lock-queue')
const itNext = require('../utils.js').itNext
const itEnd = require('../utils.js').itEnd
const getDepositTransaction = require('../utils.js').getDepositTransaction
Expand All @@ -22,11 +23,6 @@ const BLOCKNUMBER_BYTE_SIZE = require('../constants.js').BLOCKNUMBER_BYTE_SIZE
const DEPOSIT_TX_LENGTH = 73
const RECENT_TX_CACHE_SIZE = 30

// ************* HELPER FUNCTIONS ************* //
const timeout = ms => new Promise(resolve => setTimeout(resolve, ms))
const timeoutAmt = () => 0
// const timeoutAmt = () => Math.floor(Math.random() * 2)

function decodeTransaction (encoding) {
let tx
if (encoding.length === DEPOSIT_TX_LENGTH) {
Expand Down Expand Up @@ -65,6 +61,7 @@ class State {
this.lock = {}
this.recentTransactions = []
this.isCurrentBlockEmpty = true
this.queue = createLockingQueue()
}

async init () {
Expand Down Expand Up @@ -99,17 +96,13 @@ class State {
if (this.isCurrentBlockEmpty) {
throw new Error('Block is empty! Cannot start new block.')
}
if (this.lock.all === true) {
if (this.queue.isPaused()) {
throw new Error('Attempting to start a new block when a global lock is already active')
}
// Start a global lock as we increment the block number. Note that we will have to wait until all other locks are released
this.lock.all = true
// Wait until all other locks are released
while (Object.keys(this.lock).length !== 1) {
log('Waiting to acquire global lock')
await timeout(timeoutAmt())
}
// Everything should be locked now that we have a `lock.all` activated. Time to increment the blockNumber

log('Waiting to acquire global lock')
// Wait for all locks to be released and start a global lock as we increment the block number.
await this.queue.pause()
this.blockNumber = this.blockNumber.add(new BN(1))
// Create a new block
await this.db.put(Buffer.from('blockNumber'), this.blockNumber.toArrayLike(Buffer, 'big', BLOCKNUMBER_BYTE_SIZE))
Expand All @@ -120,35 +113,11 @@ class State {
this.writeStream = fs.createWriteStream(this.tmpTxLogFile, { flags: 'a' })
// Set empty block flag
this.isCurrentBlockEmpty = true
// Release our lock
delete this.lock.all
this.queue.resume()
log('#### Started new Block #', this.blockNumber.toString())
return this.blockNumber
}

attemptAcquireLocks (k) {
const keywords = k.slice() // Make a copy of the array to make sure we don't pollute anything when we add the `all` keyword
log('Attempting to acquire lock for:', keywords)
keywords.push('all')
if (keywords.some((val) => { return this.lock.hasOwnProperty(val) })) {
log('Failed')
// Failed to acquire locks
return false
}
// Acquire locks
for (let i = 0; i < keywords.length - 1; i++) {
this.lock[keywords[i]] = true
}
return true
}

releaseLocks (keywords) {
// Pop off our lock queue
for (const keyword of keywords) {
delete this.lock[keyword]
}
}

async addDeposit (recipient, token, start, end) {
// Check that we haven't already recorded this deposit
try {
Expand All @@ -157,10 +126,14 @@ class State {
return
} catch (err) {
}
while (!this.attemptAcquireLocks([token.toString(16)])) {
// Wait before attempting again
await timeout(timeoutAmt())
}

return this.queue.enqueue({
locks: [token.toString(16)],
fn: () => this._addDeposit(...arguments)
})
}

async _addDeposit (recipient, token, start, end) {
const deposit = getDepositTransaction(Web3.utils.bytesToHex(recipient), token, start, end, this.blockNumber)
const depositEncoded = deposit.encoded
try {
Expand All @@ -174,7 +147,7 @@ class State {
} catch (err) {
throw err
}
this.releaseLocks([recipient, token.toString(16)])

log('Added deposit with token type:', token.toString('hex'), ', start:', start.toString('hex'), 'and end:', end.toString('hex'))
return depositEncoded
}
Expand All @@ -195,19 +168,6 @@ class State {
}
}

async getTransactionLock (tx) {
const senders = tx.transfers.map((transfer) => transfer.sender)
while (!this.attemptAcquireLocks(senders)) {
// Wait before attempting again
await timeout(timeoutAmt())
}
}

async releaseTransactionLock (tx) {
const senders = tx.transfers.map((transfer) => transfer.sender)
this.releaseLocks(senders)
}

validateAffectedRanges (tx) {
// For all affected ranges, check all affected ranges are owned by the correct sender and blockNumber
for (const tr of tx.transfers) {
Expand Down Expand Up @@ -282,20 +242,23 @@ class State {
// Check that the transaction is well formatted
this.validateTransaction(tx)
// Acquire lock on all of the transfer record senders
await this.getTransactionLock(tx)
return this.queue.enqueue({
locks: getTxLocks(tx),
fn: () => this._addTransaction(tx)
})
}

async _addTransaction (tx) {
// Acquire lock on all of the transfer record senders
log('Attempting to add transaction from:', tx.transfers[0].sender)
try {
// Get the ranges which the transaction affects and attach them to the transaction object
await this.addAffectedRangesToTx(tx)
// Check that all of the affected ranges are valid
await this.validateAffectedRanges(tx)
// All checks have passed, now write to the DB
await this.writeTransactionToDB(tx)
} catch (err) {
this.releaseTransactionLock(tx)
throw err
}
this.releaseTransactionLock(tx)

// Get the ranges which the transaction affects and attach them to the transaction object
await this.addAffectedRangesToTx(tx)
// Check that all of the affected ranges are valid
await this.validateAffectedRanges(tx)
// All checks have passed, now write to the DB
await this.writeTransactionToDB(tx)

log('Added transaction from:', tx.transfers[0].recipient)
this.addRecentTransaction(tx)
// Record that this block is not empty
Expand Down Expand Up @@ -342,11 +305,14 @@ class State {
return affectedRanges
}

async getOwnedRanges (address) {
while (!this.attemptAcquireLocks([address])) {
// Wait before attempting again
await timeout(timeoutAmt())
}
getOwnedRanges (address) {
return this.queue.enqueue({
locks: [address],
fn: () => this._getOwnedRanges(address)
})
}

async _getOwnedRanges (address) {
// Get the ranges
const addressBuffer = Buffer.from(Web3.utils.hexToBytes(address))
const it = this.db.iterator({
Expand All @@ -359,7 +325,6 @@ class State {
result = await itNext(it)
}
await itEnd(it)
this.releaseLocks([address])
return ownedRanges
}

Expand Down Expand Up @@ -391,4 +356,6 @@ class State {
}
}

const getTxLocks = tx => tx.transfers.map((transfer) => transfer.sender)

module.exports = State