From 3d81b2e9c6d7cd875727d1907526b88dbecd6692 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 16 Jan 2025 11:01:10 -0600 Subject: [PATCH 1/7] feat: add prom metrics for current and previous tenure block counts --- src/datastore/pg-store.ts | 42 +++++++++++++++++++++++++ src/event-stream/event-server.ts | 53 ++++++++++++++++++++++++++------ 2 files changed, 86 insertions(+), 9 deletions(-) diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 92859f119..0b6773091 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4524,4 +4524,46 @@ export class PgStore extends BasePgStore { `; if (result.count) return result[0]; } + + async getCurrentAndPreviousTenureBlockCounts(): Promise<{ + current_burn_block_height: number | null; + current_block_count: number; + previous_burn_block_height: number | null; + previous_block_count: number; + }> { + const result = await this.sql< + { + current_burn_block_height: number | null; + current_block_count: number; + previous_burn_block_height: number | null; + previous_block_count: number; + }[] + >` + WITH current AS ( + SELECT MAX(burn_block_height) AS height FROM blocks + ), + current_count AS ( + SELECT COUNT(*) AS count + FROM blocks + WHERE burn_block_height = (SELECT height FROM current) AND canonical = TRUE + ), + previous AS ( + SELECT DISTINCT burn_block_height AS height + FROM blocks + ORDER BY burn_block_height DESC + LIMIT 1 OFFSET 1 + ), + previous_count AS ( + SELECT COUNT(*) AS count + FROM blocks + WHERE burn_block_height = (SELECT height FROM previous) AND canonical = TRUE + ) + SELECT + (SELECT height FROM current) AS current_burn_block_height, + (SELECT count FROM current_count) AS current_block_count, + (SELECT height FROM previous) AS previous_burn_block_height, + (SELECT count FROM previous_count) AS previous_block_count + `; + return result[0]; + } } diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 64fbf4f45..9ac28c2af 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -635,14 +635,40 @@ function createMessageProcessorQueue(): EventMessageHandler { // Create a promise queue so that only one message is handled at a time. const processorQueue = new PQueue({ concurrency: 1 }); - let eventTimer: prom.Histogram<'event'> | undefined; + let metrics: + | { + eventTimer: prom.Histogram; + blocksInPreviousTenure: prom.Gauge; + previousTenureBurnBlockHeight: prom.Gauge; + blocksInCurrentTenure: prom.Gauge; + currentTenureBurnBlockHeight: prom.Gauge; + } + | undefined; if (isProdEnv) { - eventTimer = new prom.Histogram({ - name: 'stacks_event_ingestion_timers', - help: 'Event ingestion timers', - labelNames: ['event'], - buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes - }); + metrics = { + eventTimer: new prom.Histogram({ + name: 'stacks_event_ingestion_timers', + help: 'Event ingestion timers', + labelNames: ['event'], + buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes + }), + blocksInPreviousTenure: new prom.Gauge({ + name: 'stacks_blocks_in_previous_tenure', + help: 'Number of Stacks blocks produced in previous tenure', + }), + previousTenureBurnBlockHeight: new prom.Gauge({ + name: 'stacks_previous_tenure_burn_block_height', + help: 'Burn block height of previous tenure', + }), + blocksInCurrentTenure: new prom.Gauge({ + name: 'stacks_blocks_in_current_tenure', + help: 'Number of Stacks blocks produced in current tenure', + }), + currentTenureBurnBlockHeight: new prom.Gauge({ + name: 'stacks_current_tenure_burn_block_height', + help: 'Burn block height of current tenure', + }), + }; } const observeEvent = async (event: string, fn: () => Promise) => { @@ -651,7 +677,7 @@ function createMessageProcessorQueue(): EventMessageHandler { await fn(); } finally { const elapsedMs = timer.getElapsed(); - eventTimer?.observe({ event }, elapsedMs); + metrics?.eventTimer.observe({ event }, elapsedMs); } }; @@ -666,7 +692,16 @@ function createMessageProcessorQueue(): EventMessageHandler { }, handleBlockMessage: (chainId: ChainID, msg: CoreNodeBlockMessage, db: PgWriteStore) => { return processorQueue - .add(() => observeEvent('block', () => handleBlockMessage(chainId, msg, db))) + .add(async () => { + await observeEvent('block', () => handleBlockMessage(chainId, msg, db)); + if (metrics) { + const stats = await db.getCurrentAndPreviousTenureBlockCounts(); + metrics.currentTenureBurnBlockHeight.set(stats.current_burn_block_height ?? 0); + metrics.blocksInCurrentTenure.set(stats.current_block_count); + metrics.previousTenureBurnBlockHeight.set(stats.previous_burn_block_height ?? 0); + metrics.blocksInPreviousTenure.set(stats.previous_block_count); + } + }) .catch(e => { logger.error(e, 'Error processing core node block message'); throw e; From d099c69dff65909592d8067faaa1e481cc312bcc Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 16 Jan 2025 11:05:29 -0600 Subject: [PATCH 2/7] fix: filter canonical --- src/datastore/pg-store.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 0b6773091..4cce77413 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4540,7 +4540,9 @@ export class PgStore extends BasePgStore { }[] >` WITH current AS ( - SELECT MAX(burn_block_height) AS height FROM blocks + SELECT MAX(burn_block_height) AS height + FROM blocks + WHERE canonical = TRUE ), current_count AS ( SELECT COUNT(*) AS count @@ -4550,6 +4552,7 @@ export class PgStore extends BasePgStore { previous AS ( SELECT DISTINCT burn_block_height AS height FROM blocks + WHERE canonical = TRUE ORDER BY burn_block_height DESC LIMIT 1 OFFSET 1 ), From 218c98fdec24297ee269b019ca609f60bb67219e Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 16 Jan 2025 13:59:21 -0600 Subject: [PATCH 3/7] fix: keep only blocks at previous btc block --- src/datastore/pg-store.ts | 48 ++++---------------------------- src/datastore/pg-write-store.ts | 6 ++++ src/event-stream/event-server.ts | 44 ++++++++++------------------- 3 files changed, 26 insertions(+), 72 deletions(-) diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 4cce77413..f8fc8083a 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4525,48 +4525,12 @@ export class PgStore extends BasePgStore { if (result.count) return result[0]; } - async getCurrentAndPreviousTenureBlockCounts(): Promise<{ - current_burn_block_height: number | null; - current_block_count: number; - previous_burn_block_height: number | null; - previous_block_count: number; - }> { - const result = await this.sql< - { - current_burn_block_height: number | null; - current_block_count: number; - previous_burn_block_height: number | null; - previous_block_count: number; - }[] - >` - WITH current AS ( - SELECT MAX(burn_block_height) AS height - FROM blocks - WHERE canonical = TRUE - ), - current_count AS ( - SELECT COUNT(*) AS count - FROM blocks - WHERE burn_block_height = (SELECT height FROM current) AND canonical = TRUE - ), - previous AS ( - SELECT DISTINCT burn_block_height AS height - FROM blocks - WHERE canonical = TRUE - ORDER BY burn_block_height DESC - LIMIT 1 OFFSET 1 - ), - previous_count AS ( - SELECT COUNT(*) AS count - FROM blocks - WHERE burn_block_height = (SELECT height FROM previous) AND canonical = TRUE - ) - SELECT - (SELECT height FROM current) AS current_burn_block_height, - (SELECT count FROM current_count) AS current_block_count, - (SELECT height FROM previous) AS previous_burn_block_height, - (SELECT count FROM previous_count) AS previous_block_count + async getStacksBlockCountAtBurnBlock(burnBlockHeight: number): Promise { + const result = await this.sql<{ count: number }[]>` + SELECT COUNT(*) AS count + FROM blocks + WHERE burn_block_height = ${burnBlockHeight} AND canonical = TRUE `; - return result[0]; + return result[0].count ?? 0; } } diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index bf4691e53..156c69504 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -1780,6 +1780,12 @@ export class PgWriteStore extends PgStore { }); } + async updateBurnChainBlockHeight(args: { blockHeight: number }): Promise { + await this.sql` + UPDATE chain_tip SET burn_block_height = GREATEST(${args.blockHeight}, burn_block_height) + `; + } + async insertSlotHoldersBatch(sql: PgSqlClient, slotHolders: DbRewardSlotHolder[]): Promise { const slotValues: RewardSlotHolderInsertValues[] = slotHolders.map(slot => ({ canonical: true, diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 9ac28c2af..d316cbeff 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -130,6 +130,7 @@ async function handleBurnBlockMessage( burnchainBlockHeight: burnBlockMsg.burn_block_height, slotHolders: slotHolders, }); + await db.updateBurnChainBlockHeight({ blockHeight: burnBlockMsg.burn_block_height }); } async function handleMempoolTxsMessage(rawTxs: string[], db: PgWriteStore): Promise { @@ -638,10 +639,7 @@ function createMessageProcessorQueue(): EventMessageHandler { let metrics: | { eventTimer: prom.Histogram; - blocksInPreviousTenure: prom.Gauge; - previousTenureBurnBlockHeight: prom.Gauge; - blocksInCurrentTenure: prom.Gauge; - currentTenureBurnBlockHeight: prom.Gauge; + blocksInPreviousBurnBlock: prom.Gauge; } | undefined; if (isProdEnv) { @@ -652,21 +650,9 @@ function createMessageProcessorQueue(): EventMessageHandler { labelNames: ['event'], buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes }), - blocksInPreviousTenure: new prom.Gauge({ - name: 'stacks_blocks_in_previous_tenure', - help: 'Number of Stacks blocks produced in previous tenure', - }), - previousTenureBurnBlockHeight: new prom.Gauge({ - name: 'stacks_previous_tenure_burn_block_height', - help: 'Burn block height of previous tenure', - }), - blocksInCurrentTenure: new prom.Gauge({ - name: 'stacks_blocks_in_current_tenure', - help: 'Number of Stacks blocks produced in current tenure', - }), - currentTenureBurnBlockHeight: new prom.Gauge({ - name: 'stacks_current_tenure_burn_block_height', - help: 'Burn block height of current tenure', + blocksInPreviousBurnBlock: new prom.Gauge({ + name: 'stacks_blocks_in_previous_burn_block', + help: 'Number of Stacks blocks produced in the previous burn block', }), }; } @@ -692,16 +678,7 @@ function createMessageProcessorQueue(): EventMessageHandler { }, handleBlockMessage: (chainId: ChainID, msg: CoreNodeBlockMessage, db: PgWriteStore) => { return processorQueue - .add(async () => { - await observeEvent('block', () => handleBlockMessage(chainId, msg, db)); - if (metrics) { - const stats = await db.getCurrentAndPreviousTenureBlockCounts(); - metrics.currentTenureBurnBlockHeight.set(stats.current_burn_block_height ?? 0); - metrics.blocksInCurrentTenure.set(stats.current_block_count); - metrics.previousTenureBurnBlockHeight.set(stats.previous_burn_block_height ?? 0); - metrics.blocksInPreviousTenure.set(stats.previous_block_count); - } - }) + .add(() => observeEvent('block', () => handleBlockMessage(chainId, msg, db))) .catch(e => { logger.error(e, 'Error processing core node block message'); throw e; @@ -721,7 +698,14 @@ function createMessageProcessorQueue(): EventMessageHandler { }, handleBurnBlock: (msg: CoreNodeBurnBlockMessage, db: PgWriteStore) => { return processorQueue - .add(() => observeEvent('burn_block', () => handleBurnBlockMessage(msg, db))) + .add(async () => { + await observeEvent('burn_block', () => handleBurnBlockMessage(msg, db)); + if (metrics) { + metrics.blocksInPreviousBurnBlock.set( + await db.getStacksBlockCountAtBurnBlock(msg.burn_block_height - 1) + ); + } + }) .catch(e => { logger.error(e, 'Error processing core node burn block message'); throw e; From 2f0d3e9ec6ab9317c18b837640a1bee1d1719a1e Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 16 Jan 2025 14:20:26 -0600 Subject: [PATCH 4/7] fix: empty gauge --- src/datastore/pg-store.ts | 4 ++-- src/event-stream/event-server.ts | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index f8fc8083a..8e38a458d 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4526,11 +4526,11 @@ export class PgStore extends BasePgStore { } async getStacksBlockCountAtBurnBlock(burnBlockHeight: number): Promise { - const result = await this.sql<{ count: number }[]>` + const result = await this.sql<{ count: string }[]>` SELECT COUNT(*) AS count FROM blocks WHERE burn_block_height = ${burnBlockHeight} AND canonical = TRUE `; - return result[0].count ?? 0; + return parseInt(result[0].count ?? '0'); } } diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index d316cbeff..7ef1e82dd 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -655,6 +655,7 @@ function createMessageProcessorQueue(): EventMessageHandler { help: 'Number of Stacks blocks produced in the previous burn block', }), }; + metrics.blocksInPreviousBurnBlock.remove(); } const observeEvent = async (event: string, fn: () => Promise) => { From 222a671434f30245e45cf93965765ce3eaaea5c7 Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Thu, 16 Jan 2025 14:22:36 -0600 Subject: [PATCH 5/7] fix: empty case --- src/datastore/pg-store.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 8e38a458d..aa9bf490d 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4531,6 +4531,6 @@ export class PgStore extends BasePgStore { FROM blocks WHERE burn_block_height = ${burnBlockHeight} AND canonical = TRUE `; - return parseInt(result[0].count ?? '0'); + return parseInt(result[0]?.count ?? '0'); } } From 318a4c3817677c3c0c103db1d9f441b0a50f7abb Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Fri, 17 Jan 2025 08:58:35 -0600 Subject: [PATCH 6/7] fix: update only on collect --- src/datastore/pg-store.ts | 4 ++-- src/event-stream/event-server.ts | 17 ++++++----------- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index aa9bf490d..cd1563733 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4525,11 +4525,11 @@ export class PgStore extends BasePgStore { if (result.count) return result[0]; } - async getStacksBlockCountAtBurnBlock(burnBlockHeight: number): Promise { + async getStacksBlockCountAtPreviousBurnBlock(): Promise { const result = await this.sql<{ count: string }[]>` SELECT COUNT(*) AS count FROM blocks - WHERE burn_block_height = ${burnBlockHeight} AND canonical = TRUE + WHERE burn_block_height = (SELECT burn_block_height - 1 FROM chain_tip) AND canonical = TRUE `; return parseInt(result[0]?.count ?? '0'); } diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 7ef1e82dd..5adacf835 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -632,7 +632,7 @@ interface EventMessageHandler { handleNewAttachment(msg: CoreNodeAttachmentMessage[], db: PgWriteStore): Promise | void; } -function createMessageProcessorQueue(): EventMessageHandler { +function createMessageProcessorQueue(db: PgWriteStore): EventMessageHandler { // Create a promise queue so that only one message is handled at a time. const processorQueue = new PQueue({ concurrency: 1 }); @@ -653,9 +653,11 @@ function createMessageProcessorQueue(): EventMessageHandler { blocksInPreviousBurnBlock: new prom.Gauge({ name: 'stacks_blocks_in_previous_burn_block', help: 'Number of Stacks blocks produced in the previous burn block', + collect: async () => { + metrics?.blocksInPreviousBurnBlock.set(await db.getStacksBlockCountAtPreviousBurnBlock()); + }, }), }; - metrics.blocksInPreviousBurnBlock.remove(); } const observeEvent = async (event: string, fn: () => Promise) => { @@ -699,14 +701,7 @@ function createMessageProcessorQueue(): EventMessageHandler { }, handleBurnBlock: (msg: CoreNodeBurnBlockMessage, db: PgWriteStore) => { return processorQueue - .add(async () => { - await observeEvent('burn_block', () => handleBurnBlockMessage(msg, db)); - if (metrics) { - metrics.blocksInPreviousBurnBlock.set( - await db.getStacksBlockCountAtBurnBlock(msg.burn_block_height - 1) - ); - } - }) + .add(() => observeEvent('burn_block', () => handleBurnBlockMessage(msg, db))) .catch(e => { logger.error(e, 'Error processing core node burn block message'); throw e; @@ -758,7 +753,7 @@ export async function startEventServer(opts: { serverPort?: number; }): Promise { const db = opts.datastore; - const messageHandler = opts.messageHandler ?? createMessageProcessorQueue(); + const messageHandler = opts.messageHandler ?? createMessageProcessorQueue(db); let eventHost = opts.serverHost ?? process.env['STACKS_CORE_EVENT_HOST']; const eventPort = opts.serverPort ?? parseInt(process.env['STACKS_CORE_EVENT_PORT'] ?? '', 10); From d8133b1eac8fc05f2fdd29158398f906c94baafa Mon Sep 17 00:00:00 2001 From: Rafael Cardenas Date: Fri, 17 Jan 2025 09:34:13 -0600 Subject: [PATCH 7/7] style: this --- src/event-stream/event-server.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 5adacf835..d1da30b23 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -653,8 +653,8 @@ function createMessageProcessorQueue(db: PgWriteStore): EventMessageHandler { blocksInPreviousBurnBlock: new prom.Gauge({ name: 'stacks_blocks_in_previous_burn_block', help: 'Number of Stacks blocks produced in the previous burn block', - collect: async () => { - metrics?.blocksInPreviousBurnBlock.set(await db.getStacksBlockCountAtPreviousBurnBlock()); + async collect() { + this.set(await db.getStacksBlockCountAtPreviousBurnBlock()); }, }), };