diff --git a/analyzer/block/block.go b/analyzer/block/block.go index cdfb3ab92..884c3cd06 100644 --- a/analyzer/block/block.go +++ b/analyzer/block/block.go @@ -208,6 +208,7 @@ func (b *blockBasedAnalyzer) isBlockProcessedBySlowSync(ctx context.Context, hei // that that assumption is valid even in the face of misconfigured past runs, e.g. if we // processed the range [1000, 2000] but now want to process [1, infinity). func (b *blockBasedAnalyzer) softEnqueueGapsInProcessedBlocks(ctx context.Context) error { + b.logger.Info("fast-sync: ensuring that any gaps in the already-processed block range can be picked up later", "from", b.blockRange.From, "to", b.blockRange.To) batch := &storage.QueryBatch{} batch.Queue( queries.SoftEnqueueGapsInProcessedBlocks, @@ -219,7 +220,7 @@ func (b *blockBasedAnalyzer) softEnqueueGapsInProcessedBlocks(ctx context.Contex b.logger.Error("failed to soft-enqueue gaps in already-processed blocks", "err", err, "from", b.blockRange.From, "to", b.blockRange.To) return err } - b.logger.Info("ensured that any gaps in the already-processed block range can be picked up later", "from", b.blockRange.From, "to", b.blockRange.To) + b.logger.Info("any potential gaps resolved", "from", b.blockRange.From, "to", b.blockRange.To) return nil } diff --git a/analyzer/block/block_test.go b/analyzer/block/block_test.go index 850563def..ef31889fe 100644 --- a/analyzer/block/block_test.go +++ b/analyzer/block/block_test.go @@ -449,7 +449,55 @@ func TestFinalizeFastSync(t *testing.T) { p = &mockProcessor{name: "consensus", storage: db} setupAnalyzer(t, db, p, &config.BlockBasedAnalyzerConfig{From: 21, To: 30}, analyzer.SlowSyncMode).Start(ctx) require.Nil(t, p.fastSyncFinalizedAt, - "sencond slow-sync analyzer should not finalize fast-sync because its range extends an existing slow-sync-analyzed range") + "second slow-sync analyzer should not finalize fast-sync because its range extends an existing slow-sync-analyzed range") +} + +// Tests the `SoftEnqueueGapsInProcessedBlocks` query. +func TestSoftEqueueGaps(t *testing.T) { + ctx := context.Background() + db := setupDB(t) + + // Returns a sorted list of all heights that have an entry in the processed_blocks table (even if not completed). + getHeights := func() []uint64 { + rows, err := db.Query(ctx, "SELECT height FROM analysis.processed_blocks WHERE analyzer = $1 ORDER BY height", "consensus") + require.NoError(t, err) + heights := []uint64{} + for rows.Next() { + var height uint64 + require.NoError(t, rows.Scan(&height)) + heights = append(heights, height) + } + return heights + } + + // Inserts a row into the processed_blocks table. + markAsProcessed := func(analyzer string, height uint64) { + batch := storage.QueryBatch{} + batch.Queue("INSERT INTO analysis.processed_blocks (analyzer, height, locked_time) values ($1, $2, '-infinity')", analyzer, height) + require.NoError(t, db.SendBatch(ctx, &batch)) + } + + // Runs the query that we're testing. + enqueueGaps := func(from, to int64) { + batch := &storage.QueryBatch{} + batch.Queue(queries.SoftEnqueueGapsInProcessedBlocks, "consensus", from, to) + require.NoError(t, db.SendBatch(ctx, batch)) + } + + // Sanity check our helper methods. + markAsProcessed("some_other_analyzer", 10) // to test that queries are scoped by analyzer + require.Equal(t, []uint64{}, getHeights()) + + // Pretend some blocks are already processed. + markAsProcessed("consensus", 3) + markAsProcessed("consensus", 4) + markAsProcessed("consensus", 6) + require.Equal(t, []uint64{3, 4, 6}, getHeights()) + + // Fill the gaps. + // NOTE: Only the gaps. Heights above the highest-processed-so-far (i.e. 6) are not enqueued. + enqueueGaps(1, 10) + require.Equal(t, []uint64{1, 2, 3, 4, 5, 6}, getHeights()) } func TestRefuseSlowSyncOnDirtyRange(t *testing.T) { diff --git a/analyzer/queries/queries.go b/analyzer/queries/queries.go index 5e6783d69..f062e404c 100644 --- a/analyzer/queries/queries.go +++ b/analyzer/queries/queries.go @@ -150,11 +150,17 @@ var ( SELECT COALESCE(max(height), -1) as height FROM analysis.processed_blocks WHERE analyzer = $1 + ), + gaps AS ( + SELECT h + FROM highest_encountered_block, generate_series(GREATEST(1, $2::bigint), LEAST(highest_encountered_block.height, $3::bigint)) AS h + EXCEPT + SELECT height FROM analysis.processed_blocks WHERE analyzer = $1 ) INSERT INTO analysis.processed_blocks (analyzer, height, locked_time) SELECT $1, h, '-infinity'::timestamptz - FROM highest_encountered_block, generate_series(GREATEST(1, $2::bigint), LEAST(highest_encountered_block.height, $3::bigint)) AS h + FROM gaps ON CONFLICT (analyzer, height) DO NOTHING` IndexingProgress = `