Skip to content

Commit

Permalink
fix: optimize mempool transaction reads and writes (#1781) (#1792)
Browse files Browse the repository at this point in the history
* fix: change mempool_digest into a table

* fix: change digest to be last updated timestamp

* fix: build

* fix: update count on reconcile

* test: mempool renconcile
  • Loading branch information
rafaelcr authored Jan 2, 2024
1 parent aa287f8 commit 2700642
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 136 deletions.
27 changes: 27 additions & 0 deletions migrations/1703177555075_mempool-digest-table.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* eslint-disable camelcase */

exports.shorthands = undefined;

exports.up = pgm => {
pgm.addColumn('chain_tip', {
mempool_tx_count: {
type: 'int',
default: 0,
},
mempool_updated_at: {
type: 'timestamptz',
default: pgm.func('(NOW())'),
},
});
pgm.sql(`
UPDATE chain_tip SET
mempool_tx_count = (SELECT COUNT(*)::int FROM mempool_txs WHERE pruned = FALSE),
mempool_updated_at = NOW()
`);
pgm.alterColumn('chain_tip', 'mempool_tx_count', { notNull: true });
pgm.alterColumn('chain_tip', 'mempool_updated_at', { notNull: true });
};

exports.down = pgm => {
pgm.dropColumn('chain_tip', ['mempool_tx_count', 'mempool_updated_at']);
};
15 changes: 9 additions & 6 deletions src/datastore/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1306,12 +1306,13 @@ export class PgStore {
const unanchoredTxs: string[] = !includeUnanchored
? (await this.getUnanchoredTxsInternal(sql)).txs.map(tx => tx.tx_id)
: [];
// If caller is not filtering by any param, get the tx count from the `chain_tip` table.
const count =
senderAddress || recipientAddress || address
? sql`(COUNT(*) OVER())::int AS count`
: sql`(SELECT mempool_tx_count FROM chain_tip) AS count`;
const resultQuery = await sql<(MempoolTxQueryResult & { count: number })[]>`
SELECT ${unsafeCols(sql, [
...MEMPOOL_TX_COLUMNS,
abiColumn('mempool_txs'),
'(COUNT(*) OVER())::INTEGER AS count',
])}
SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}, ${count}
FROM mempool_txs
WHERE ${
address
Expand Down Expand Up @@ -1355,7 +1356,9 @@ export class PgStore {
* @returns `FoundOrNot` object with a possible `digest` string.
*/
async getMempoolTxDigest(): Promise<FoundOrNot<{ digest: string }>> {
const result = await this.sql<{ digest: string }[]>`SELECT digest FROM mempool_digest`;
const result = await this.sql<{ digest: string }[]>`
SELECT date_part('epoch', mempool_updated_at)::text AS digest FROM chain_tip
`;
if (result.length === 0) {
return { found: false } as const;
}
Expand Down
243 changes: 133 additions & 110 deletions src/datastore/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,6 @@ export class PgWriteStore extends PgStore {
`;
});

await this.refreshMaterializedView('mempool_digest');

if (this.notifier) {
for (const microblock of dbMicroblocks) {
await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash });
Expand Down Expand Up @@ -736,20 +734,28 @@ export class PgWriteStore extends PgStore {
// NOTE: this is essentially a work-around for whatever bug is causing the underlying problem.
async reconcileMempoolStatus(sql: PgSqlClient): Promise<void> {
const txsResult = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = true
FROM txs
WHERE
mempool_txs.tx_id = txs.tx_id AND
mempool_txs.pruned = false AND
txs.canonical = true AND
txs.microblock_canonical = true AND
txs.status IN ${sql([
DbTxStatus.Success,
DbTxStatus.AbortByResponse,
DbTxStatus.AbortByPostCondition,
])}
RETURNING mempool_txs.tx_id
WITH pruned AS (
UPDATE mempool_txs
SET pruned = true
FROM txs
WHERE
mempool_txs.tx_id = txs.tx_id AND
mempool_txs.pruned = false AND
txs.canonical = true AND
txs.microblock_canonical = true AND
txs.status IN ${sql([
DbTxStatus.Success,
DbTxStatus.AbortByResponse,
DbTxStatus.AbortByPostCondition,
])}
RETURNING mempool_txs.tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
if (txsResult.length > 0) {
const txs = txsResult.map(tx => tx.tx_id);
Expand Down Expand Up @@ -1656,89 +1662,95 @@ export class PgWriteStore extends PgStore {
return result.count;
}

async insertDbMempoolTx(
tx: DbMempoolTxRaw,
async insertDbMempoolTxs(
txs: DbMempoolTxRaw[],
chainTip: DbChainTip,
sql: PgSqlClient
): Promise<boolean> {
const values: MempoolTxInsertValues = {
pruned: tx.pruned,
tx_id: tx.tx_id,
raw_tx: tx.raw_tx,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
receipt_time: tx.receipt_time,
receipt_block_height: chainTip.block_height,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
};
const result = await sql`
INSERT INTO mempool_txs ${sql(values)}
ON CONFLICT ON CONSTRAINT unique_tx_id DO NOTHING
`;
if (result.count !== 1) {
const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`;
logger.warn(errMsg);
return false;
} else {
return true;
): Promise<string[]> {
const txIds: string[] = [];
for (const batch of batchIterate(txs, 500)) {
const values: MempoolTxInsertValues[] = batch.map(tx => ({
pruned: tx.pruned,
tx_id: tx.tx_id,
raw_tx: tx.raw_tx,
type_id: tx.type_id,
anchor_mode: tx.anchor_mode,
status: tx.status,
receipt_time: tx.receipt_time,
receipt_block_height: chainTip.block_height,
post_conditions: tx.post_conditions,
nonce: tx.nonce,
fee_rate: tx.fee_rate,
sponsored: tx.sponsored,
sponsor_nonce: tx.sponsor_nonce ?? null,
sponsor_address: tx.sponsor_address ?? null,
sender_address: tx.sender_address,
origin_hash_mode: tx.origin_hash_mode,
token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null,
token_transfer_amount: tx.token_transfer_amount ?? null,
token_transfer_memo: tx.token_transfer_memo ?? null,
smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null,
smart_contract_contract_id: tx.smart_contract_contract_id ?? null,
smart_contract_source_code: tx.smart_contract_source_code ?? null,
contract_call_contract_id: tx.contract_call_contract_id ?? null,
contract_call_function_name: tx.contract_call_function_name ?? null,
contract_call_function_args: tx.contract_call_function_args ?? null,
poison_microblock_header_1: tx.poison_microblock_header_1 ?? null,
poison_microblock_header_2: tx.poison_microblock_header_2 ?? null,
coinbase_payload: tx.coinbase_payload ?? null,
coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null,
}));
const result = await sql<{ tx_id: string }[]>`
WITH inserted AS (
INSERT INTO mempool_txs ${sql(values)}
ON CONFLICT ON CONSTRAINT unique_tx_id DO NOTHING
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM inserted),
mempool_updated_at = NOW()
)
SELECT tx_id FROM inserted
`;
txIds.push(...result.map(r => r.tx_id));
}
return txIds;
}

async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTxRaw[] }): Promise<void> {
const updatedTxIds: string[] = [];
await this.sqlWriteTransaction(async sql => {
const chainTip = await this.getChainTip();
for (const tx of txs) {
const inserted = await this.insertDbMempoolTx(tx, chainTip, sql);
if (inserted) {
updatedTxIds.push(tx.tx_id);
}
}
updatedTxIds.push(...(await this.insertDbMempoolTxs(txs, chainTip, sql)));
if (!this.isEventReplay) {
await this.reconcileMempoolStatus(sql);

const mempoolStats = await this.getMempoolStatsInternal({ sql });
this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats);
}
});
await this.refreshMaterializedView('mempool_digest');
for (const txId of updatedTxIds) {
await this.notifier?.sendTx({ txId: txId });
await this.notifier?.sendTx({ txId });
}
}

async dropMempoolTxs({ status, txIds }: { status: DbTxStatus; txIds: string[] }): Promise<void> {
const updateResults = await this.sql<MempoolTxQueryResult[]>`
UPDATE mempool_txs
SET pruned = true, status = ${status}
WHERE tx_id IN ${this.sql(txIds)}
RETURNING ${this.sql(MEMPOOL_TX_COLUMNS)}
const updateResults = await this.sql<{ tx_id: string }[]>`
WITH pruned AS (
UPDATE mempool_txs
SET pruned = TRUE, status = ${status}
WHERE tx_id IN ${this.sql(txIds)} AND pruned = FALSE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r));
await this.refreshMaterializedView('mempool_digest');
for (const tx of updatedTxs) {
await this.notifier?.sendTx({ txId: tx.tx_id });
for (const txId of updateResults.map(r => r.tx_id)) {
await this.notifier?.sendTx({ txId });
}
}

Expand Down Expand Up @@ -2326,19 +2338,24 @@ export class PgWriteStore extends PgStore {
* @param txIds - List of transactions to update in the mempool
*/
async restoreMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ restoredTxs: string[] }> {
if (txIds.length === 0) {
// Avoid an unnecessary query.
return { restoredTxs: [] };
}
if (txIds.length === 0) return { restoredTxs: [] };
for (const txId of txIds) {
logger.debug(`Restoring mempool tx: ${txId}`);
}

const updatedRows = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = false
WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id
WITH restored AS (
UPDATE mempool_txs
SET pruned = FALSE
WHERE tx_id IN ${sql(txIds)} AND pruned = TRUE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM restored),
mempool_updated_at = NOW()
)
SELECT tx_id FROM restored
`;

const updatedTxs = updatedRows.map(r => r.tx_id);
Expand Down Expand Up @@ -2393,13 +2410,20 @@ export class PgWriteStore extends PgStore {
logger.debug(`Pruning mempool tx: ${txId}`);
}
const updateResults = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = true
WHERE tx_id IN ${sql(txIds)}
RETURNING tx_id
WITH pruned AS (
UPDATE mempool_txs
SET pruned = true
WHERE tx_id IN ${sql(txIds)} AND pruned = FALSE
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const removedTxs = updateResults.map(r => r.tx_id);
return { removedTxs: removedTxs };
return { removedTxs: updateResults.map(r => r.tx_id) };
}

/**
Expand All @@ -2408,27 +2432,26 @@ export class PgWriteStore extends PgStore {
* @returns List of deleted `tx_id`s
*/
async deleteGarbageCollectedMempoolTxs(sql: PgSqlClient): Promise<{ deletedTxs: string[] }> {
// Get threshold block.
const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256;
const cutoffResults = await sql<{ block_height: number }[]>`
SELECT (MAX(block_height) - ${blockThreshold}) AS block_height
FROM blocks
WHERE canonical = TRUE
`;
if (cutoffResults.length != 1) {
return { deletedTxs: [] };
}
const cutoffBlockHeight = cutoffResults[0].block_height;
// Delete every mempool tx that came before that block.
const blockThreshold = parseInt(
process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256'
);
// TODO: Use DELETE instead of UPDATE once we implement a non-archival API replay mode.
const deletedTxResults = await sql<{ tx_id: string }[]>`
UPDATE mempool_txs
SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect}
WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight}
RETURNING tx_id
WITH pruned AS (
UPDATE mempool_txs
SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect}
WHERE pruned = FALSE
AND receipt_block_height <= (SELECT block_height - ${blockThreshold} FROM chain_tip)
RETURNING tx_id
),
count_update AS (
UPDATE chain_tip SET
mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned),
mempool_updated_at = NOW()
)
SELECT tx_id FROM pruned
`;
const deletedTxs = deletedTxResults.map(r => r.tx_id);
return { deletedTxs: deletedTxs };
return { deletedTxs: deletedTxResults.map(r => r.tx_id) };
}

async markEntitiesCanonical(
Expand Down
1 change: 0 additions & 1 deletion src/event-replay/event-replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ export async function importEventsFromTsv(
responses.push(response);
}
}
await db.finishEventReplay();
console.log(`Event import and playback successful.`);
await eventServer.closeAsync();
await db.close();
Expand Down
Loading

0 comments on commit 2700642

Please sign in to comment.