From ddb9f6a6b57d6300b9d52b1d674d979dd8d8dd1d Mon Sep 17 00:00:00 2001 From: Mitja T Date: Tue, 7 Nov 2023 05:55:48 +0100 Subject: [PATCH 1/4] analyzer/block: more efficient db query on startup --- analyzer/queries/queries.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index 5e6783d69..f062e404c 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -150,11 +150,17 @@ var ( SELECT COALESCE(max(height), -1) as height FROM analysis.processed_blocks WHERE analyzer = $1 + ), + gaps AS ( + SELECT h + FROM highest_encountered_block, generate_series(GREATEST(1, $2::bigint), LEAST(highest_encountered_block.height, $3::bigint)) AS h + EXCEPT + SELECT height FROM analysis.processed_blocks WHERE analyzer = $1 ) INSERT INTO analysis.processed_blocks (analyzer, height, locked_time) SELECT $1, h, '-infinity'::timestamptz - FROM highest_encountered_block, generate_series(GREATEST(1, $2::bigint), LEAST(highest_encountered_block.height, $3::bigint)) AS h + FROM gaps ON CONFLICT (analyzer, height) DO NOTHING` IndexingProgress = ` From e12368c214d51c3b367a5bf063c8ebdc18c36724 Mon Sep 17 00:00:00 2001 From: Mitja T Date: Tue, 7 Nov 2023 05:55:56 +0100 Subject: [PATCH 2/4] typo --- analyzer/block/block_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/analyzer/block/block_test.go b/analyzer/block/block_test.go index 850563def..64c4498ad 100644 --- a/analyzer/block/block_test.go +++ b/analyzer/block/block_test.go @@ -449,7 +449,7 @@ func TestFinalizeFastSync(t *testing.T) { p = &mockProcessor{name: "consensus", storage: db} setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 21, To: 30}, analyzer.SlowSyncMode).Start(ctx) require.Nil(t, p.fastSyncFinalizedAt, - "sencond slow-sync analyzer should not finalize fast-sync because its range extends an existing slow-sync-analyzed range") + "second slow-sync analyzer should not finalize fast-sync because its range extends an existing slow-sync-analyzed range") } func TestRefuseSlowSyncOnDirtyRange(t *testing.T) { From 2a234a7218b9f7ad10ad7bbc8fa647dbfc3ee899 Mon Sep 17 00:00:00 2001 From: Mitja T Date: Tue, 7 Nov 2023 06:36:17 +0100 Subject: [PATCH 3/4] add test --- analyzer/block/block_test.go | 48 ++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/analyzer/block/block_test.go b/analyzer/block/block_test.go index 64c4498ad..ef31889fe 100644 --- a/analyzer/block/block_test.go +++ b/analyzer/block/block_test.go @@ -452,6 +452,54 @@ func TestFinalizeFastSync(t *testing.T) { "second slow-sync analyzer should not finalize fast-sync because its range extends an existing slow-sync-analyzed range") } +// Tests the `SoftEnqueueGapsInProcessedBlocks` query. +func TestSoftEqueueGaps(t *testing.T) { + ctx := context.Background() + db := setupDB(t) + + // Returns a sorted list of all heights that have an entry in the processed_blocks table (even if not completed). + getHeights := func() []uint64 { + rows, err := db.Query(ctx, "SELECT height FROM analysis.processed_blocks WHERE analyzer = $1 ORDER BY height", "consensus") + require.NoError(t, err) + heights := []uint64{} + for rows.Next() { + var height uint64 + require.NoError(t, rows.Scan(&height)) + heights = append(heights, height) + } + return heights + } + + // Inserts a row into the processed_blocks table. + markAsProcessed := func(analyzer string, height uint64) { + batch := storage.QueryBatch{} + batch.Queue("INSERT INTO analysis.processed_blocks (analyzer, height, locked_time) values ($1, $2, '-infinity')", analyzer, height) + require.NoError(t, db.SendBatch(ctx, &batch)) + } + + // Runs the query that we're testing. + enqueueGaps := func(from, to int64) { + batch := &storage.QueryBatch{} + batch.Queue(queries.SoftEnqueueGapsInProcessedBlocks, "consensus", from, to) + require.NoError(t, db.SendBatch(ctx, batch)) + } + + // Sanity check our helper methods. + markAsProcessed("some_other_analyzer", 10) // to test that queries are scoped by analyzer + require.Equal(t, []uint64{}, getHeights()) + + // Pretend some blocks are already processed. + markAsProcessed("consensus", 3) + markAsProcessed("consensus", 4) + markAsProcessed("consensus", 6) + require.Equal(t, []uint64{3, 4, 6}, getHeights()) + + // Fill the gaps. + // NOTE: Only the gaps. Heights above the highest-processed-so-far (i.e. 6) are not enqueued. + enqueueGaps(1, 10) + require.Equal(t, []uint64{1, 2, 3, 4, 5, 6}, getHeights()) +} + func TestRefuseSlowSyncOnDirtyRange(t *testing.T) { // Test that slow-sync analyzer won't start if the already-analyzed block range is non-contiguous. ctx := context.Background() From ea2fa15e34a62dbe322ef3ef093af8922341dfa9 Mon Sep 17 00:00:00 2001 From: Mitja T Date: Tue, 7 Nov 2023 07:04:07 +0100 Subject: [PATCH 4/4] rearrange logging so startup wait is more informed --- analyzer/block/block.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/analyzer/block/block.go b/analyzer/block/block.go index cdfb3ab92..884c3cd06 100644 --- a/analyzer/block/block.go +++ b/analyzer/block/block.go @@ -208,6 +208,7 @@ func (b *blockBasedAnalyzer) isBlockProcessedBySlowSync(ctx context.Context, hei // that that assumption is valid even in the face of misconfigured past runs, e.g. if we // processed the range [1000, 2000] but now want to process [1, infinity). func (b *blockBasedAnalyzer) softEnqueueGapsInProcessedBlocks(ctx context.Context) error { + b.logger.Info("fast-sync: ensuring that any gaps in the already-processed block range can be picked up later", "from", b.blockRange.From, "to", b.blockRange.To) batch := &storage.QueryBatch{} batch.Queue( queries.SoftEnqueueGapsInProcessedBlocks, @@ -219,7 +220,7 @@ func (b *blockBasedAnalyzer) softEnqueueGapsInProcessedBlocks(ctx context.Contex b.logger.Error("failed to soft-enqueue gaps in already-processed blocks", "err", err, "from", b.blockRange.From, "to", b.blockRange.To) return err } - b.logger.Info("ensured that any gaps in the already-processed block range can be picked up later", "from", b.blockRange.From, "to", b.blockRange.To) + b.logger.Info("any potential gaps resolved", "from", b.blockRange.From, "to", b.blockRange.To) return nil }