diff --git a/migrations/1703177555075_mempool-digest-table.js b/migrations/1703177555075_mempool-digest-table.js new file mode 100644 index 0000000000..af0185ac74 --- /dev/null +++ b/migrations/1703177555075_mempool-digest-table.js @@ -0,0 +1,27 @@ +/* eslint-disable camelcase */ + +exports.shorthands = undefined; + +exports.up = pgm => { + pgm.addColumn('chain_tip', { + mempool_tx_count: { + type: 'int', + default: 0, + }, + mempool_updated_at: { + type: 'timestamptz', + default: pgm.func('(NOW())'), + }, + }); + pgm.sql(` + UPDATE chain_tip SET + mempool_tx_count = (SELECT COUNT(*)::int FROM mempool_txs WHERE pruned = FALSE), + mempool_updated_at = NOW() + `); + pgm.alterColumn('chain_tip', 'mempool_tx_count', { notNull: true }); + pgm.alterColumn('chain_tip', 'mempool_updated_at', { notNull: true }); +}; + +exports.down = pgm => { + pgm.dropColumn('chain_tip', ['mempool_tx_count', 'mempool_updated_at']); +}; diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index e33acd8651..88d1776816 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -1306,12 +1306,13 @@ export class PgStore { const unanchoredTxs: string[] = !includeUnanchored ? (await this.getUnanchoredTxsInternal(sql)).txs.map(tx => tx.tx_id) : []; + // If caller is not filtering by any param, get the tx count from the `chain_tip` table. + const count = + senderAddress || recipientAddress || address + ? sql`(COUNT(*) OVER())::int AS count` + : sql`(SELECT mempool_tx_count FROM chain_tip) AS count`; const resultQuery = await sql<(MempoolTxQueryResult & { count: number })[]>` - SELECT ${unsafeCols(sql, [ - ...MEMPOOL_TX_COLUMNS, - abiColumn('mempool_txs'), - '(COUNT(*) OVER())::INTEGER AS count', - ])} + SELECT ${unsafeCols(sql, [...MEMPOOL_TX_COLUMNS, abiColumn('mempool_txs')])}, ${count} FROM mempool_txs WHERE ${ address @@ -1355,7 +1356,9 @@ export class PgStore { * @returns `FoundOrNot` object with a possible `digest` string. */ async getMempoolTxDigest(): Promise> { - const result = await this.sql<{ digest: string }[]>`SELECT digest FROM mempool_digest`; + const result = await this.sql<{ digest: string }[]>` + SELECT date_part('epoch', mempool_updated_at)::text AS digest FROM chain_tip + `; if (result.length === 0) { return { found: false } as const; } diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index f2224c99c9..55e9ca48a5 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -705,8 +705,6 @@ export class PgWriteStore extends PgStore { `; }); - await this.refreshMaterializedView('mempool_digest'); - if (this.notifier) { for (const microblock of dbMicroblocks) { await this.notifier.sendMicroblock({ microblockHash: microblock.microblock_hash }); @@ -736,20 +734,28 @@ export class PgWriteStore extends PgStore { // NOTE: this is essentially a work-around for whatever bug is causing the underlying problem. async reconcileMempoolStatus(sql: PgSqlClient): Promise { const txsResult = await sql<{ tx_id: string }[]>` - UPDATE mempool_txs - SET pruned = true - FROM txs - WHERE - mempool_txs.tx_id = txs.tx_id AND - mempool_txs.pruned = false AND - txs.canonical = true AND - txs.microblock_canonical = true AND - txs.status IN ${sql([ - DbTxStatus.Success, - DbTxStatus.AbortByResponse, - DbTxStatus.AbortByPostCondition, - ])} - RETURNING mempool_txs.tx_id + WITH pruned AS ( + UPDATE mempool_txs + SET pruned = true + FROM txs + WHERE + mempool_txs.tx_id = txs.tx_id AND + mempool_txs.pruned = false AND + txs.canonical = true AND + txs.microblock_canonical = true AND + txs.status IN ${sql([ + DbTxStatus.Success, + DbTxStatus.AbortByResponse, + DbTxStatus.AbortByPostCondition, + ])} + RETURNING mempool_txs.tx_id + ), + count_update AS ( + UPDATE chain_tip SET + mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned), + mempool_updated_at = NOW() + ) + SELECT tx_id FROM pruned `; if (txsResult.length > 0) { const txs = txsResult.map(tx => tx.tx_id); @@ -1656,89 +1662,95 @@ export class PgWriteStore extends PgStore { return result.count; } - async insertDbMempoolTx( - tx: DbMempoolTxRaw, + async insertDbMempoolTxs( + txs: DbMempoolTxRaw[], chainTip: DbChainTip, sql: PgSqlClient - ): Promise { - const values: MempoolTxInsertValues = { - pruned: tx.pruned, - tx_id: tx.tx_id, - raw_tx: tx.raw_tx, - type_id: tx.type_id, - anchor_mode: tx.anchor_mode, - status: tx.status, - receipt_time: tx.receipt_time, - receipt_block_height: chainTip.block_height, - post_conditions: tx.post_conditions, - nonce: tx.nonce, - fee_rate: tx.fee_rate, - sponsored: tx.sponsored, - sponsor_nonce: tx.sponsor_nonce ?? null, - sponsor_address: tx.sponsor_address ?? null, - sender_address: tx.sender_address, - origin_hash_mode: tx.origin_hash_mode, - token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null, - token_transfer_amount: tx.token_transfer_amount ?? null, - token_transfer_memo: tx.token_transfer_memo ?? null, - smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null, - smart_contract_contract_id: tx.smart_contract_contract_id ?? null, - smart_contract_source_code: tx.smart_contract_source_code ?? null, - contract_call_contract_id: tx.contract_call_contract_id ?? null, - contract_call_function_name: tx.contract_call_function_name ?? null, - contract_call_function_args: tx.contract_call_function_args ?? null, - poison_microblock_header_1: tx.poison_microblock_header_1 ?? null, - poison_microblock_header_2: tx.poison_microblock_header_2 ?? null, - coinbase_payload: tx.coinbase_payload ?? null, - coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null, - }; - const result = await sql` - INSERT INTO mempool_txs ${sql(values)} - ON CONFLICT ON CONSTRAINT unique_tx_id DO NOTHING - `; - if (result.count !== 1) { - const errMsg = `A duplicate transaction was attempted to be inserted into the mempool_txs table: ${tx.tx_id}`; - logger.warn(errMsg); - return false; - } else { - return true; + ): Promise { + const txIds: string[] = []; + for (const batch of batchIterate(txs, 500)) { + const values: MempoolTxInsertValues[] = batch.map(tx => ({ + pruned: tx.pruned, + tx_id: tx.tx_id, + raw_tx: tx.raw_tx, + type_id: tx.type_id, + anchor_mode: tx.anchor_mode, + status: tx.status, + receipt_time: tx.receipt_time, + receipt_block_height: chainTip.block_height, + post_conditions: tx.post_conditions, + nonce: tx.nonce, + fee_rate: tx.fee_rate, + sponsored: tx.sponsored, + sponsor_nonce: tx.sponsor_nonce ?? null, + sponsor_address: tx.sponsor_address ?? null, + sender_address: tx.sender_address, + origin_hash_mode: tx.origin_hash_mode, + token_transfer_recipient_address: tx.token_transfer_recipient_address ?? null, + token_transfer_amount: tx.token_transfer_amount ?? null, + token_transfer_memo: tx.token_transfer_memo ?? null, + smart_contract_clarity_version: tx.smart_contract_clarity_version ?? null, + smart_contract_contract_id: tx.smart_contract_contract_id ?? null, + smart_contract_source_code: tx.smart_contract_source_code ?? null, + contract_call_contract_id: tx.contract_call_contract_id ?? null, + contract_call_function_name: tx.contract_call_function_name ?? null, + contract_call_function_args: tx.contract_call_function_args ?? null, + poison_microblock_header_1: tx.poison_microblock_header_1 ?? null, + poison_microblock_header_2: tx.poison_microblock_header_2 ?? null, + coinbase_payload: tx.coinbase_payload ?? null, + coinbase_alt_recipient: tx.coinbase_alt_recipient ?? null, + })); + const result = await sql<{ tx_id: string }[]>` + WITH inserted AS ( + INSERT INTO mempool_txs ${sql(values)} + ON CONFLICT ON CONSTRAINT unique_tx_id DO NOTHING + RETURNING tx_id + ), + count_update AS ( + UPDATE chain_tip SET + mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM inserted), + mempool_updated_at = NOW() + ) + SELECT tx_id FROM inserted + `; + txIds.push(...result.map(r => r.tx_id)); } + return txIds; } async updateMempoolTxs({ mempoolTxs: txs }: { mempoolTxs: DbMempoolTxRaw[] }): Promise { const updatedTxIds: string[] = []; await this.sqlWriteTransaction(async sql => { const chainTip = await this.getChainTip(); - for (const tx of txs) { - const inserted = await this.insertDbMempoolTx(tx, chainTip, sql); - if (inserted) { - updatedTxIds.push(tx.tx_id); - } - } + updatedTxIds.push(...(await this.insertDbMempoolTxs(txs, chainTip, sql))); if (!this.isEventReplay) { await this.reconcileMempoolStatus(sql); - const mempoolStats = await this.getMempoolStatsInternal({ sql }); this.eventEmitter.emit('mempoolStatsUpdate', mempoolStats); } }); - await this.refreshMaterializedView('mempool_digest'); for (const txId of updatedTxIds) { - await this.notifier?.sendTx({ txId: txId }); + await this.notifier?.sendTx({ txId }); } } async dropMempoolTxs({ status, txIds }: { status: DbTxStatus; txIds: string[] }): Promise { - const updateResults = await this.sql` - UPDATE mempool_txs - SET pruned = true, status = ${status} - WHERE tx_id IN ${this.sql(txIds)} - RETURNING ${this.sql(MEMPOOL_TX_COLUMNS)} + const updateResults = await this.sql<{ tx_id: string }[]>` + WITH pruned AS ( + UPDATE mempool_txs + SET pruned = TRUE, status = ${status} + WHERE tx_id IN ${this.sql(txIds)} AND pruned = FALSE + RETURNING tx_id + ), + count_update AS ( + UPDATE chain_tip SET + mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned), + mempool_updated_at = NOW() + ) + SELECT tx_id FROM pruned `; - const updatedTxs = updateResults.map(r => parseMempoolTxQueryResult(r)); - await this.refreshMaterializedView('mempool_digest'); - for (const tx of updatedTxs) { - await this.notifier?.sendTx({ txId: tx.tx_id }); + for (const txId of updateResults.map(r => r.tx_id)) { + await this.notifier?.sendTx({ txId }); } } @@ -2326,19 +2338,24 @@ export class PgWriteStore extends PgStore { * @param txIds - List of transactions to update in the mempool */ async restoreMempoolTxs(sql: PgSqlClient, txIds: string[]): Promise<{ restoredTxs: string[] }> { - if (txIds.length === 0) { - // Avoid an unnecessary query. - return { restoredTxs: [] }; - } + if (txIds.length === 0) return { restoredTxs: [] }; for (const txId of txIds) { logger.debug(`Restoring mempool tx: ${txId}`); } const updatedRows = await sql<{ tx_id: string }[]>` - UPDATE mempool_txs - SET pruned = false - WHERE tx_id IN ${sql(txIds)} - RETURNING tx_id + WITH restored AS ( + UPDATE mempool_txs + SET pruned = FALSE + WHERE tx_id IN ${sql(txIds)} AND pruned = TRUE + RETURNING tx_id + ), + count_update AS ( + UPDATE chain_tip SET + mempool_tx_count = mempool_tx_count + (SELECT COUNT(*) FROM restored), + mempool_updated_at = NOW() + ) + SELECT tx_id FROM restored `; const updatedTxs = updatedRows.map(r => r.tx_id); @@ -2393,13 +2410,20 @@ export class PgWriteStore extends PgStore { logger.debug(`Pruning mempool tx: ${txId}`); } const updateResults = await sql<{ tx_id: string }[]>` - UPDATE mempool_txs - SET pruned = true - WHERE tx_id IN ${sql(txIds)} - RETURNING tx_id + WITH pruned AS ( + UPDATE mempool_txs + SET pruned = true + WHERE tx_id IN ${sql(txIds)} AND pruned = FALSE + RETURNING tx_id + ), + count_update AS ( + UPDATE chain_tip SET + mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned), + mempool_updated_at = NOW() + ) + SELECT tx_id FROM pruned `; - const removedTxs = updateResults.map(r => r.tx_id); - return { removedTxs: removedTxs }; + return { removedTxs: updateResults.map(r => r.tx_id) }; } /** @@ -2408,27 +2432,26 @@ export class PgWriteStore extends PgStore { * @returns List of deleted `tx_id`s */ async deleteGarbageCollectedMempoolTxs(sql: PgSqlClient): Promise<{ deletedTxs: string[] }> { - // Get threshold block. - const blockThreshold = process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? 256; - const cutoffResults = await sql<{ block_height: number }[]>` - SELECT (MAX(block_height) - ${blockThreshold}) AS block_height - FROM blocks - WHERE canonical = TRUE - `; - if (cutoffResults.length != 1) { - return { deletedTxs: [] }; - } - const cutoffBlockHeight = cutoffResults[0].block_height; - // Delete every mempool tx that came before that block. + const blockThreshold = parseInt( + process.env['STACKS_MEMPOOL_TX_GARBAGE_COLLECTION_THRESHOLD'] ?? '256' + ); // TODO: Use DELETE instead of UPDATE once we implement a non-archival API replay mode. const deletedTxResults = await sql<{ tx_id: string }[]>` - UPDATE mempool_txs - SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect} - WHERE pruned = FALSE AND receipt_block_height < ${cutoffBlockHeight} - RETURNING tx_id + WITH pruned AS ( + UPDATE mempool_txs + SET pruned = TRUE, status = ${DbTxStatus.DroppedApiGarbageCollect} + WHERE pruned = FALSE + AND receipt_block_height <= (SELECT block_height - ${blockThreshold} FROM chain_tip) + RETURNING tx_id + ), + count_update AS ( + UPDATE chain_tip SET + mempool_tx_count = mempool_tx_count - (SELECT COUNT(*) FROM pruned), + mempool_updated_at = NOW() + ) + SELECT tx_id FROM pruned `; - const deletedTxs = deletedTxResults.map(r => r.tx_id); - return { deletedTxs: deletedTxs }; + return { deletedTxs: deletedTxResults.map(r => r.tx_id) }; } async markEntitiesCanonical( diff --git a/src/event-replay/event-replay.ts b/src/event-replay/event-replay.ts index 53cb077509..172c13e5d1 100644 --- a/src/event-replay/event-replay.ts +++ b/src/event-replay/event-replay.ts @@ -167,7 +167,6 @@ export async function importEventsFromTsv( responses.push(response); } } - await db.finishEventReplay(); console.log(`Event import and playback successful.`); await eventServer.closeAsync(); await db.close(); diff --git a/src/tests/cache-control-tests.ts b/src/tests/cache-control-tests.ts index 0718f0574b..a2d88c960d 100644 --- a/src/tests/cache-control-tests.ts +++ b/src/tests/cache-control-tests.ts @@ -373,7 +373,7 @@ describe('cache-control tests', () => { const request1 = await supertest(api.server).get('/extended/v1/tx/mempool'); expect(request1.status).toBe(200); expect(request1.type).toBe('application/json'); - expect(request1.headers['etag']).toEqual('"0"'); + const etag0 = request1.headers['etag']; // Add mempool txs. const mempoolTx1 = testMempoolTx({ tx_id: '0x1101' }); @@ -386,6 +386,7 @@ describe('cache-control tests', () => { expect(request2.type).toBe('application/json'); expect(request2.headers['etag']).toBeTruthy(); const etag1 = request2.headers['etag']; + expect(etag1).not.toEqual(etag0); // Cache works with valid ETag. const request3 = await supertest(api.server) @@ -416,13 +417,12 @@ describe('cache-control tests', () => { .build(); await db.update(block2); - // Cache is now a miss and ETag is zero because mempool is empty. + // Cache is now a miss. const request5 = await supertest(api.server) .get('/extended/v1/tx/mempool') .set('If-None-Match', etag2); expect(request5.status).toBe(200); expect(request5.type).toBe('application/json'); - expect(request5.headers['etag']).toEqual('"0"'); const etag3 = request5.headers['etag']; // Restore a tx back into the mempool by making its anchor block non-canonical. @@ -449,7 +449,7 @@ describe('cache-control tests', () => { .set('If-None-Match', etag3); expect(request6.status).toBe(200); expect(request6.type).toBe('application/json'); - expect(request6.headers['etag']).toEqual(etag2); + expect(request6.headers['etag']).not.toEqual(etag3); const etag4 = request6.headers['etag']; // Garbage collect all txs. @@ -463,25 +463,13 @@ describe('cache-control tests', () => { .build(); await db.update(block4); - // ETag zero once again. + // ETag changes once again. const request7 = await supertest(api.server) .get('/extended/v1/tx/mempool') .set('If-None-Match', etag4); expect(request7.status).toBe(200); expect(request7.type).toBe('application/json'); - expect(request7.headers['etag']).toEqual('"0"'); - - // Simulate an incompatible pg version (without `bit_xor`). - await client.begin(async sql => { - await sql`DROP MATERIALIZED VIEW mempool_digest`; - await sql`CREATE MATERIALIZED VIEW mempool_digest AS (SELECT NULL AS digest)`; - }); - - // ETag is undefined as if mempool cache did not exist. - const request8 = await supertest(api.server).get('/extended/v1/tx/mempool'); - expect(request8.status).toBe(200); - expect(request8.type).toBe('application/json'); - expect(request8.headers['etag']).toBeUndefined(); + expect(request7.headers['etag']).not.toEqual(etag4); }); test('transaction cache control', async () => { diff --git a/src/tests/mempool-tests.ts b/src/tests/mempool-tests.ts index b31952e829..b44440fae4 100644 --- a/src/tests/mempool-tests.ts +++ b/src/tests/mempool-tests.ts @@ -1489,7 +1489,7 @@ describe('mempool tests', () => { // directly inserting the mempool-tx and mined-tx, bypassing the normal update functions. await db.updateBlock(db.sql, dbBlock1); const chainTip = await db.getChainTip(); - await db.insertDbMempoolTx(mempoolTx, chainTip, db.sql); + await db.insertDbMempoolTxs([mempoolTx], chainTip, db.sql); await db.updateTx(db.sql, dbTx1); // Verify tx shows up in mempool (non-pruned) @@ -1497,6 +1497,8 @@ describe('mempool tests', () => { `/extended/v1/address/${mempoolTx.sender_address}/mempool` ); expect(mempoolResult1.body.results[0].tx_id).toBe(txId); + const mempoolCount1 = await supertest(api.server).get(`/extended/v1/tx/mempool`); + expect(mempoolCount1.body.total).toBe(1); const mempoolResult2 = await supertest(api.server).get( `/extended/v1/tx/mempool?sender_address=${senderAddress}` ); @@ -1519,6 +1521,8 @@ describe('mempool tests', () => { `/extended/v1/address/${mempoolTx.sender_address}/mempool` ); expect(mempoolResult3.body.results).toHaveLength(0); + const mempoolCount2 = await supertest(api.server).get(`/extended/v1/tx/mempool`); + expect(mempoolCount2.body.total).toBe(0); const mempoolResult4 = await supertest(api.server).get( `/extended/v1/tx/mempool?sender_address=${senderAddress}` );