Skip to content

Commit

Permalink
Renaming taskManager map property
Browse files Browse the repository at this point in the history
  • Loading branch information
Lyudmil Danailov authored and Lyudmil Danailov committed Jan 28, 2024
1 parent fbdbaf3 commit 1980390
Show file tree
Hide file tree
Showing 22 changed files with 106 additions and 117 deletions.
8 changes: 4 additions & 4 deletions blockchains/bnb/bnb_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class BNBWorker extends BaseWorker {
}

// The API returns most recent transactions first. Take block number from the first one.
this.lastBufferedBlock = fetchTransactions[0].blockHeight;
this.lastExportedBlock = fetchTransactions[0].blockHeight;

if (!this.bnbTradesMode) {
fetchTransactions = await fetch_transactions.replaceParentTransactionsWithChildren(this.queue,
Expand All @@ -88,7 +88,7 @@ class BNBWorker extends BaseWorker {

resultTransactions = getTransactionsWithKeys(fetchTransactions);

this.lastBufferedBlock = resultTransactions[resultTransactions.length - 1].blockHeight;
this.lastExportedBlock = resultTransactions[resultTransactions.length - 1].blockHeight;
}

// The upper limit of the load rate is enforced by p-queue.
Expand Down Expand Up @@ -118,7 +118,7 @@ class BNBWorker extends BaseWorker {
*/
getLastProcessedPosition() {
return {
blockNumber: this.lastBufferedBlock,
blockNumber: this.lastExportedBlock,
timestampReached: this.bnbTransactionsFetcher.getIntervalFetchEnd(),
fetchRangeMsec: this.bnbTransactionsFetcher.getMsecInFetchRange()
};
Expand Down Expand Up @@ -151,7 +151,7 @@ class BNBWorker extends BaseWorker {

this.bnbTransactionsFetcher = new BNBTransactionsFetcher(lastProcessedPosition.timestampReached, fetchRangeMsec,
this.bnbTradesMode);
this.lastBufferedBlock = lastProcessedPosition.blockNumber;
this.lastExportedBlock = lastProcessedPosition.blockNumber;

return lastProcessedPosition;
}
Expand Down
6 changes: 3 additions & 3 deletions blockchains/cardano/cardano_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class CardanoWorker extends BaseWorker {
}

async work() {
const fromBlock = this.lastBufferedBlock + 1;
const fromBlock = this.lastExportedBlock + 1;
if (fromBlock >= this.lastConfirmedBlock - 2) {
// We are up to date with the blockchain (aka 'current mode'). Sleep longer after finishing this loop.
// The last confirmed block may be partial and would not be exported. Allow for one block gap.
Expand All @@ -192,7 +192,7 @@ class CardanoWorker extends BaseWorker {
}

let transactions = null;
if (this.lastBufferedBlock < 0) {
if (this.lastExportedBlock < 0) {
transactions = await this.getGenesisTransactions();
if (transactions.length === 0) {
throw new Error('Error getting Cardano genesis transactions');
Expand All @@ -213,7 +213,7 @@ class CardanoWorker extends BaseWorker {
transactions[i].primaryKey = this.lastPrimaryKey + i + 1;
}

this.lastBufferedBlock = transactions[transactions.length - 1].block.number;
this.lastExportedBlock = transactions[transactions.length - 1].block.number;
this.lastPrimaryKey += transactions.length;

return transactions;
Expand Down
6 changes: 3 additions & 3 deletions blockchains/erc20/erc20_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ class ERC20Worker extends BaseWorker {
}

getBlocksListInterval() {
if (this.lastBufferedBlock === -1 && this.blocksList.length > 0) {
if (this.lastExportedBlock === -1 && this.blocksList.length > 0) {
return {
success: true,
fromBlock: this.blocksList[0][0],
toBlock: this.blocksList[0][1]
};
}
while (this.blocksList.length > 0 && this.lastBufferedBlock >= this.blocksList[0][1]) {
while (this.blocksList.length > 0 && this.lastExportedBlock >= this.blocksList[0][1]) {
this.blocksList.shift();
}
if (this.blocksList.length === 0) {
Expand Down Expand Up @@ -139,7 +139,7 @@ class ERC20Worker extends BaseWorker {
this.lastPrimaryKey = events[events.length - 1].primaryKey;
}

this.lastBufferedBlock = interval.toBlock;
this.lastExportedBlock = interval.toBlock;
const resultEvents = events.concat(overwritten_events);

// If overwritten events have been generated, they need to be merged into the original events
Expand Down
24 changes: 12 additions & 12 deletions blockchains/eth/eth_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const BaseWorker = require('../../lib/worker_base');
const Web3Wrapper = require('./lib/web3_wrapper');
const { decodeTransferTrace } = require('./lib/decode_transfers');
const { FeesDecoder } = require('./lib/fees_decoder');
const { nextIntervalCalculator } = require('./lib/next_interval_calculator');
const { nextIntervalCalculator, isNewBlockAvailable } = require('./lib/next_interval_calculator');
const { WithdrawalsDecoder } = require('./lib/withdrawals_decoder');

class ETHWorker extends BaseWorker {
Expand Down Expand Up @@ -40,8 +40,7 @@ class ETHWorker extends BaseWorker {
fromBlock: this.web3Wrapper.parseNumberToHex(fromBlock),
toBlock: this.web3Wrapper.parseNumberToHex(toBlock)
}])
.then((data) => this.parseEthInternalTrx(data['result']))
.catch((err) => { throw err; });
.then((data) => this.parseEthInternalTrx(data['result']));
}

async fetchBlocks(fromBlock, toBlock) {
Expand Down Expand Up @@ -146,14 +145,15 @@ class ETHWorker extends BaseWorker {
return result;
}

async work(key) {
const result = await nextIntervalCalculator(this);
if (!result.success) return [];
this.lastBufferedBlock = result.toBlock;

logger.info(`Fetching transfer events for interval ${result.fromBlock}:${result.toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(result.fromBlock, result.toBlock);
const events = this.transformPastEvents(result.fromBlock, result.toBlock, traces, blocks, receipts);
async work() {
if (!(await isNewBlockAvailable(this))) {
return [];
}
const { fromBlock, toBlock } = nextIntervalCalculator(this);
this.lastExportedBlock = toBlock;
logger.info(`Fetching transfer events for interval ${fromBlock}:${toBlock}`);
const [traces, blocks, receipts] = await this.fetchData(fromBlock, toBlock);
const events = this.transformPastEvents(fromBlock, toBlock, traces, blocks, receipts);

if (events.length > 0) {
stableSort(events, transactionOrder);
Expand All @@ -164,7 +164,7 @@ class ETHWorker extends BaseWorker {
this.lastPrimaryKey += events.length;
}

return [key, events];
return events;
}

async init() {
Expand Down
29 changes: 19 additions & 10 deletions blockchains/eth/lib/next_interval_calculator.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
/**
* A function that returns the appropriate interval,
* depending on the progress that the worker's made.
* If the exporter's caught up, we check for a new block. We then check whether the Node
* returns a valid block (sometimes the Node returns an early block, like 3 for example).
* We don't want to get the new blocks right away, so we set a sleep variable. On the next iteration
* the function will return the appropriate array of intervals.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {Array} The corresponding interval
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {boolean} Boolean variable set to true or false, depending on whether there's a new block or not
*/
async function nextIntervalCalculator(worker) {
if (worker.lastBufferedBlock >= worker.lastConfirmedBlock) {
async function isNewBlockAvailable(worker) {
if (worker.lastExportedBlock >= worker.lastConfirmedBlock) {
const newConfirmedBlock = await worker.web3Wrapper.getBlockNumber() - worker.settings.CONFIRMATIONS;
if (worker.lastConfirmedBlock < newConfirmedBlock) {
worker.lastConfirmedBlock = newConfirmedBlock;
}
worker.sleepTimeMsec = worker.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
return { success: false };

return false;
}

return true;
}

/**
* A function that returns the appropriate interval,
* depending on the progress that the worker's made.
* @param {BaseWorker} worker A worker instance, inherriting the BaseWorker class.
* @returns {object} The corresponding interval
*/
function nextIntervalCalculator(worker) {
worker.sleepTimeMsec = 0;
return {
success: true,
fromBlock: worker.lastBufferedBlock + 1,
toBlock: Math.min(worker.lastBufferedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
fromBlock: worker.lastExportedBlock + 1,
toBlock: Math.min(worker.lastExportedBlock + worker.settings.BLOCK_INTERVAL, worker.lastConfirmedBlock)
};
}

module.exports = {
isNewBlockAvailable,
nextIntervalCalculator
};
2 changes: 1 addition & 1 deletion blockchains/matic/matic_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MaticWorker extends BaseWorker {
this.lastPrimaryKey = events[events.length - 1].primaryKey;
}

this.lastBufferedBlock = result.toBlock;
this.lastExportedBlock = result.toBlock;
return events;

}
Expand Down
8 changes: 4 additions & 4 deletions blockchains/receipts/receipts_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ReceiptsWorker extends BaseWorker {
}

async work() {
if (this.lastConfirmedBlock === this.lastBufferedBlock) {
if (this.lastConfirmedBlock === this.lastExportedBlock) {
this.sleepTimeMsec = this.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;

const newConfirmedBlock = await this.web3Wrapper.getBlockNumber() - this.settings.CONFIRMATIONS;
Expand All @@ -103,13 +103,13 @@ class ReceiptsWorker extends BaseWorker {
this.sleepTimeMsec = 0;
}

const toBlock = Math.min(this.lastBufferedBlock + this.settings.BLOCK_INTERVAL, this.lastConfirmedBlock);
const fromBlock = this.lastBufferedBlock + 1;
const toBlock = Math.min(this.lastExportedBlock + this.settings.BLOCK_INTERVAL, this.lastConfirmedBlock);
const fromBlock = this.lastExportedBlock + 1;

logger.info(`Fetching receipts for interval ${fromBlock}:${toBlock}`);
const receipts = await this.getReceiptsForBlocks(fromBlock, toBlock);

this.lastBufferedBlock = toBlock;
this.lastExportedBlock = toBlock;

return receipts;
}
Expand Down
8 changes: 4 additions & 4 deletions blockchains/utxo/utxo_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class UtxoWorker extends BaseWorker {
}

async work() {
if (this.lastConfirmedBlock === this.lastBufferedBlock) {
if (this.lastConfirmedBlock === this.lastExportedBlock) {
this.sleepTimeMsec = this.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;

const blockchainInfo = await this.sendRequestWithRetry('getblockchaininfo', []);
Expand All @@ -102,10 +102,10 @@ class UtxoWorker extends BaseWorker {
this.sleepTimeMsec = 0;
}

const numConcurrentRequests = Math.min(this.MAX_CONCURRENT_REQUESTS, this.lastConfirmedBlock - this.lastBufferedBlock);
const requests = Array.from({ length: numConcurrentRequests }, (_, i) => this.fetchBlock(this.lastBufferedBlock + 1 + i));
const numConcurrentRequests = Math.min(this.MAX_CONCURRENT_REQUESTS, this.lastConfirmedBlock - this.lastExportedBlock);
const requests = Array.from({ length: numConcurrentRequests }, (_, i) => this.fetchBlock(this.lastExportedBlock + 1 + i));
const blocks = await Promise.all(requests);
this.lastBufferedBlock += blocks.length;
this.lastExportedBlock += blocks.length;
return blocks;
}
}
Expand Down
8 changes: 4 additions & 4 deletions blockchains/xrp/xrp_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class XRPWorker extends BaseWorker {
}

async work() {
if (this.lastConfirmedBlock === this.lastBufferedBlock) {
if (this.lastConfirmedBlock === this.lastExportedBlock) {
this.sleepTimeMsec = this.settings.LOOP_INTERVAL_CURRENT_MODE_SEC * 1000;
const lastValidatedLedger = await this.connectionSend(this.connections[0], {
command: 'ledger',
Expand All @@ -165,8 +165,8 @@ class XRPWorker extends BaseWorker {
} else {
this.sleepTimeMsec = 0;
}
const toBlock = Math.min(this.lastBufferedBlock + this.settings.SEND_BATCH_SIZE, this.lastConfirmedBlock);
let fromBlock = this.lastBufferedBlock + 1;
const toBlock = Math.min(this.lastExportedBlock + this.settings.SEND_BATCH_SIZE, this.lastConfirmedBlock);
let fromBlock = this.lastExportedBlock + 1;

const requests = [];
let transfers = [];
Expand All @@ -182,7 +182,7 @@ class XRPWorker extends BaseWorker {
});
this.checkAllTransactionsValid(ledgers);

this.lastBufferedBlock = toBlock;
this.lastExportedBlock = toBlock;
if (ledgers.length > 0) {
this.lastPrimaryKey = ledgers[ledgers.length - 1].primaryKey;
}
Expand Down
16 changes: 6 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,21 @@ class Main {
metrics.currentBlock.set(this.worker.lastConfirmedBlock);
metrics.requestsCounter.inc(this.worker.getNewRequestsCount());
metrics.requestsResponseTime.observe(new Date() - this.worker.lastRequestStartTime);
metrics.lastExportedBlock.set(this.worker.lastBufferedBlock);
metrics.lastExportedBlock.set(this.worker.lastExportedBlock);
}

async waitOnStoreEvents() {
const bufferCopy = this.taskManager.retrieveCompleted();
await this.exporter.storeEvents(bufferCopy);
this.lastProcessedPosition = this.taskManager.lastProcessedPosition;
this.lastProcessedPosition = {
primaryKey: bufferCopy[bufferCopy.length - 1].primaryKey,
blockNumber: bufferCopy[bufferCopy.length - 1].blockNumber
};
}

async workLoop() {
while (this.shouldWork) {
if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) {
const currentKey = this.lastWorkerKey;
this.taskManager.pushToQueue(() => this.worker.work(currentKey).catch(err => {
logger.error(err.stack);
this.shouldWork = false;
}));
this.lastWorkerKey++;
}
if (this.taskManager.queue.size < constantsBase.PQUEUE_MAX_SIZE) this.taskManager.pushToQueue(this.worker.work);
this.worker.lastRequestStartTime = new Date();
this.worker.lastExportTime = Date.now();

Expand Down
2 changes: 1 addition & 1 deletion integration_test/src/binance-native-token.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ describe('BEP20 worker test', function () {
};
const bep20Worker = new worker.worker(settings);
await bep20Worker.init();
bep20Worker.lastBufferedBlock = 19999999;
bep20Worker.lastExportedBlock = 19999999;

let expectedDataPosition = 0;
for (let i = 0; i < 5; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('BEP20 worker test', function () {
};
const erc20Worker = new worker.worker(settings);
await erc20Worker.init(new MockExporter());
erc20Worker.lastBufferedBlock = 9999999;
erc20Worker.lastExportedBlock = 9999999;

let expectedDataPosition = 0;
for (let i = 0; i < 10; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion integration_test/src/matic.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('Matic worker test', function () {
};
const maticWorker = new worker.worker(settings);
await maticWorker.init(new MockExporter());
maticWorker.lastBufferedBlock = 49999999;
maticWorker.lastExportedBlock = 49999999;

let expectedDataPosition = 0;
for (let i = 0; i < 4; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion integration_test/src/optimism-erc20.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ describe('Optimism worker test', function () {
};
const erc20Worker = new worker.worker(settings);
await erc20Worker.init(new MockExporter());
erc20Worker.lastBufferedBlock = 99999999;
erc20Worker.lastExportedBlock = 99999999;

let expectedDataPosition = 0;
for (let i = 0; i < 10; ++i) {
Expand Down
Loading

0 comments on commit 1980390

Please sign in to comment.