From 58e561b86e197fd667afffb1bc112bb536346f35 Mon Sep 17 00:00:00 2001
From: Dmytro Haidashenko <dmytro.haidashenko@smartcontract.com>
Date: Thu, 16 Jan 2025 13:16:58 +0100
Subject: [PATCH 1/2] applied patch from CL repo

---
 core/chains/evm/logpoller/log_poller.go       | 258 ++++++++++++------
 .../evm/logpoller/log_poller_internal_test.go | 199 +++++++++++---
 core/chains/evm/logpoller/log_poller_test.go  |  18 +-
 core/chains/evm/logpoller/orm.go              |  13 +
 core/chains/evm/logpoller/orm_test.go         |  32 +++
 5 files changed, 390 insertions(+), 130 deletions(-)

diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go
index dd7e0c5242..8b2c3bfdef 100644
--- a/core/chains/evm/logpoller/log_poller.go
+++ b/core/chains/evm/logpoller/log_poller.go
@@ -75,7 +75,7 @@ type LogPoller interface {
 type LogPollerTest interface {
 	LogPoller
 	PollAndSaveLogs(ctx context.Context, currentBlockNumber int64)
-	BackupPollAndSaveLogs(ctx context.Context)
+	BackupPollAndSaveLogs(ctx context.Context) error
 	Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
 	GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
 	PruneOldBlocks(ctx context.Context) (bool, error)
@@ -407,6 +407,12 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) {
 	defer func() {
 		if errors.Is(err, context.Canceled) {
 			err = ErrReplayRequestAborted
+		} else if errors.Is(err, commontypes.ErrFinalityViolated) {
+			// Replay only declares finality violation and does not resolve it, as it's possible that [fromBlock, savedFinalizedBlockNumber]
+			// does not contain the violation.
+			lp.lggr.Criticalw("Replay failed due to finality violation", "fromBlock", fromBlock, "err", err)
+			lp.finalityViolated.Store(true)
+			lp.SvcErrBuffer.Append(err)
 		}
 	}()
 
@@ -625,7 +631,17 @@ func (lp *logPoller) run() {
 				lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping")
 				continue
 			}
-			lp.BackupPollAndSaveLogs(ctx)
+			err := lp.BackupPollAndSaveLogs(ctx)
+			switch {
+			case errors.Is(err, commontypes.ErrFinalityViolated):
+				// BackupPoll only declares finality violation and does not resolve it, as it's possible that processed range
+				// does not contain the violation.
+				lp.lggr.Criticalw("Backup poll failed due to finality violation", "err", err)
+				lp.finalityViolated.Store(true)
+				lp.SvcErrBuffer.Append(err)
+			case err != nil:
+				lp.lggr.Errorw("Backup poller failed, retrying later", "err", err)
+			}
 		}
 	}
 }
@@ -695,16 +711,16 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64
 	}
 }
 
-func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) {
+func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) error {
 	if lp.backupPollerNextBlock == 0 {
 		lastProcessed, err := lp.orm.SelectLatestBlock(ctx)
 		if err != nil {
 			if pkgerrors.Is(err, sql.ErrNoRows) {
 				lp.lggr.Warnw("Backup log poller ran before first successful log poller run, skipping")
-			} else {
-				lp.lggr.Errorw("Backup log poller unable to get starting block", "err", err)
+				return nil
 			}
-			return
+
+			return fmt.Errorf("unable to get starting block: %w", err)
 		}
 		// If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-backupPollerBlockDelay)
 		backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-lp.backupPollerBlockDelay)
@@ -715,7 +731,7 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) {
 	_, latestFinalizedBlockNumber, err := lp.latestBlocks(ctx)
 	if err != nil {
 		lp.lggr.Warnw("Backup logpoller failed to get latest block", "err", err)
-		return
+		return nil
 	}
 
 	lastSafeBackfillBlock := latestFinalizedBlockNumber - 1
@@ -724,12 +740,13 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) {
 		if err = lp.backfill(ctx, lp.backupPollerNextBlock, lastSafeBackfillBlock); err != nil {
 			// If there's an error backfilling, we can just return and retry from the last block saved
 			// since we don't save any blocks on backfilling. We may re-insert the same logs but thats ok.
-			lp.lggr.Warnw("Backup poller failed", "err", err)
-			return
+			return fmt.Errorf("backfill failed: %w", err)
 		}
 		lp.lggr.Infow("Backup poller finished backfilling", "start", lp.backupPollerNextBlock, "end", lastSafeBackfillBlock)
 		lp.backupPollerNextBlock = lastSafeBackfillBlock + 1
 	}
+
+	return nil
 }
 
 // convertLogs converts an array of geth logs ([]type.Log) to an array of logpoller logs ([]Log)
@@ -778,17 +795,32 @@ func convertTopics(topics []common.Hash) [][]byte {
 	return topicsForDB
 }
 
-// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber,
+// blocksFromFinalizedLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber,
 // whether or not there are any logs in the list from that block
-func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) {
-	var numbers []uint64
+func (lp *logPoller) blocksFromFinalizedLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) {
+	numbers := make([]uint64, 0, len(logs))
 	for _, log := range logs {
 		numbers = append(numbers, log.BlockNumber)
 	}
 	if numbers[len(numbers)-1] != endBlockNumber {
 		numbers = append(numbers, endBlockNumber)
 	}
-	return lp.GetBlocksRange(ctx, numbers)
+	blocks, err = lp.GetBlocksRange(ctx, numbers)
+	if err != nil {
+		return nil, err
+	}
+
+	if len(logs) == 0 {
+		return blocks, nil
+	}
+
+	for i, log := range logs {
+		if log.BlockHash != blocks[i].BlockHash {
+			return nil, fmt.Errorf("finalized log produced by tx %s has block hash %s that does not match fetched block's hash %s: %w", log.TxHash, log.BlockHash, blocks[i].BlockHash, commontypes.ErrFinalityViolated)
+		}
+	}
+
+	return blocks, nil
 }
 
 // backfill will query FilterLogs in batches for logs in the
@@ -819,7 +851,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
 		if len(gethLogs) == 0 {
 			continue
 		}
-		blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to))
+		blocks, err := lp.blocksFromFinalizedLogs(ctx, gethLogs, uint64(to)) //nolint:gosec // G115
 		if err != nil {
 			return err
 		}
@@ -849,11 +881,6 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
 // 2. Delete all logs and blocks after the LCA
 // 3. Return the LCA+1, i.e. our new current (unprocessed) block.
 func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) {
-	defer func() {
-		if err == nil {
-			lp.finalityViolated.Store(false)
-		}
-	}()
 
 	var err1 error
 	if currentBlock == nil {
@@ -884,7 +911,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
 	// We will not have the previous currentBlock on initial poll.
 	havePreviousBlock := err1 == nil
 	if !havePreviousBlock {
-		lp.lggr.Infow("Do not have previous block, first poll ever on new chain or after backfill", "currentBlockNumber", currentBlockNumber)
+		lp.lggr.Infow("Do not have previous block, first poll ever on new chain", "currentBlockNumber", currentBlockNumber)
 		return currentBlock, nil
 	}
 	// Check for reorg.
@@ -894,8 +921,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
 		// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
 		blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber)
 		if err2 != nil {
-			lp.lggr.Warnw("Unable to find LCA after reorg, retrying", "err", err2)
-			return nil, pkgerrors.New("Unable to find LCA after reorg, retrying")
+			return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
 		}
 
 		lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber)
@@ -922,20 +948,40 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
 // currentBlockNumber is the block from where new logs are to be polled & saved. Under normal
 // conditions this would be equal to lastProcessed.BlockNumber + 1.
 func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) {
+	err := lp.pollAndSaveLogs(ctx, currentBlockNumber)
+	if errors.Is(err, commontypes.ErrFinalityViolated) {
+		lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err)
+		lp.finalityViolated.Store(true)
+		lp.SvcErrBuffer.Append(err)
+		return
+	}
+
+	if err != nil {
+		lp.lggr.Errorw("Failed to poll and save logs, retrying later", "err", err)
+		return
+	}
+
+	if lp.finalityViolated.Load() {
+		lp.lggr.Info("PollAndSaveLogs completed successfully - removing finality violation flag")
+		lp.finalityViolated.Store(false)
+	}
+}
+
+func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) {
 	lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber)
 	// Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks()
 	// latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration
 	latestBlock, latestFinalizedBlockNumber, err := lp.latestBlocks(ctx)
 	if err != nil {
 		lp.lggr.Warnw("Unable to get latestBlockNumber block", "err", err, "currentBlockNumber", currentBlockNumber)
-		return
+		return nil
 	}
 	latestBlockNumber := latestBlock.Number
 	if currentBlockNumber > latestBlockNumber {
 		// Note there can also be a reorg "shortening" i.e. chain height decreases but TDD increases. In that case
 		// we also just wait until the new tip is longer and then detect the reorg.
 		lp.lggr.Debugw("No new blocks since last poll", "currentBlockNumber", currentBlockNumber, "latestBlockNumber", latestBlockNumber)
-		return
+		return nil
 	}
 	var currentBlock *evmtypes.Head
 	if currentBlockNumber == latestBlockNumber {
@@ -948,8 +994,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
 	if err != nil {
 		// If there's an error handling the reorg, we can't be sure what state the db was left in.
 		// Resume from the latest block saved and retry.
-		lp.lggr.Errorw("Unable to get current block, retrying", "err", err)
-		return
+		return fmt.Errorf("unable to get current block: %w", err)
 	}
 	currentBlockNumber = currentBlock.Number
 
@@ -964,8 +1009,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
 		if err = lp.backfill(ctx, currentBlockNumber, lastSafeBackfillBlock); err != nil {
 			// If there's an error backfilling, we can just return and retry from the last block saved
 			// since we don't save any blocks on backfilling. We may re-insert the same logs but thats ok.
-			lp.lggr.Warnw("Unable to backfill finalized logs, retrying later", "err", err)
-			return
+			return fmt.Errorf("failed to backfill finalized logs: %w", err)
 		}
 		currentBlockNumber = lastSafeBackfillBlock + 1
 	}
@@ -976,8 +1020,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
 			if err != nil {
 				// If there's an error handling the reorg, we can't be sure what state the db was left in.
 				// Resume from the latest block saved.
-				lp.lggr.Errorw("Unable to get current block", "err", err)
-				return
+				return fmt.Errorf("failed to get current block: %w", err)
 			}
 			currentBlockNumber = currentBlock.Number
 		}
@@ -998,7 +1041,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
 		)
 		if err != nil {
 			lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
-			return
+			return nil
 		}
 		// Update current block.
 		// Same reorg detection on unfinalized blocks.
@@ -1007,6 +1050,8 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
 			break
 		}
 	}
+
+	return nil
 }
 
 // Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker
@@ -1042,12 +1087,14 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
 	// We loop via parent instead of current so current always holds the LCA+1.
 	// If the parent block number becomes < the first finalized block our reorg is too deep.
 	// This can happen only if finalityTag is not enabled and fixed finalityDepth is provided via config.
+	var ourParentBlockHash common.Hash
 	for parent.Number >= latestFinalizedBlockNumber {
-		ourParentBlockHash, err := lp.orm.SelectBlockByNumber(ctx, parent.Number)
+		outParentBlock, err := lp.orm.SelectBlockByNumber(ctx, parent.Number)
 		if err != nil {
 			return nil, err
 		}
-		if parent.Hash == ourParentBlockHash.BlockHash {
+		ourParentBlockHash = outParentBlock.BlockHash
+		if parent.Hash == ourParentBlockHash {
 			// If we do have the blockhash, return blockAfterLCA
 			return blockAfterLCA, nil
 		}
@@ -1058,11 +1105,9 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
 			return nil, err
 		}
 	}
+
 	lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
-	rerr := pkgerrors.New("Reorg greater than finality depth")
-	lp.SvcErrBuffer.Append(rerr)
-	lp.finalityViolated.Store(true)
-	return nil, rerr
+	return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number)
 }
 
 // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
@@ -1256,11 +1301,10 @@ func (lp *logPoller) fillRemainingBlocksFromRPC(
 	blocksRequested map[uint64]struct{},
 	blocksFound map[uint64]LogPollerBlock,
 ) (map[uint64]LogPollerBlock, error) {
-	var remainingBlocks []string
-
+	remainingBlocks := make([]uint64, 0, len(blocksRequested))
 	for num := range blocksRequested {
 		if _, ok := blocksFound[num]; !ok {
-			remainingBlocks = append(remainingBlocks, hexutil.EncodeBig(new(big.Int).SetUint64(num)))
+			remainingBlocks = append(remainingBlocks, num)
 		}
 	}
 
@@ -1304,25 +1348,23 @@ var (
 	finalizedBlock blockValidationType = blockValidationType(rpc.FinalizedBlockNumber.String())
 )
 
-// fetchBlocks fetches a list of blocks in a single batch. validationReq is the string to use for the
+// fetchBlocks fetches a list of blocks in a single batch. finalityValidationReq is the string to use for the
 // additional validation request (either the "finalized" or "latest" string defined in rpc module), which
 // will be used to validate the finality of the other blocks.
-func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, validationReq blockValidationType) (blocks []*evmtypes.Head, err error) {
+// chainReference - is used to verify that fetched blocks belong to the same chain as referenced head.
+func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []uint64, finalityValidationReq blockValidationType, chainReference *LogPollerBlock) (blocks map[uint64]*evmtypes.Head, err error) {
 	n := len(blocksRequested)
-	blocks = make([]*evmtypes.Head, 0, n+1)
-	reqs := make([]rpc.BatchElem, 0, n+1)
+	blocks = make(map[uint64]*evmtypes.Head, n+2)
+	reqs := make([]rpc.BatchElem, 0, n+2)
 
-	validationBlockIndex := n
-	for k, num := range blocksRequested {
-		if num == string(validationReq) {
-			validationBlockIndex = k
-		}
-		reqs = append(reqs, newBlockReq(num))
+	for _, num := range blocksRequested {
+		reqs = append(reqs, newBlockReq(hexutil.EncodeBig(big.NewInt(0).SetUint64(num))))
 	}
 
-	if validationBlockIndex == n {
-		// Add validation req if it wasn't in there already
-		reqs = append(reqs, newBlockReq(string(validationReq)))
+	reqs = append(reqs, newBlockReq(string(finalityValidationReq)))
+
+	if chainReference != nil {
+		reqs = append(reqs, newBlockReq(hexutil.EncodeBig(big.NewInt(chainReference.BlockNumber))))
 	}
 
 	err = lp.ec.BatchCallContext(ctx, reqs)
@@ -1330,64 +1372,128 @@ func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string,
 		return nil, err
 	}
 
-	validationBlock, err := validateBlockResponse(reqs[validationBlockIndex])
+	// ensure that requested blocks belong to the same chain as referenced head
+	if chainReference != nil {
+		var rpcChainReference *evmtypes.Head
+		rpcChainReference, err = validateBlockResponse(reqs[len(reqs)-1])
+		if err != nil {
+			return nil, err
+		}
+
+		if rpcChainReference.Hash != chainReference.BlockHash {
+			return nil, fmt.Errorf("expected RPC's finalized block hash at hegiht %d to be %s but got %s: %w",
+				chainReference.BlockNumber, chainReference.BlockHash, rpcChainReference.Hash, commontypes.ErrFinalityViolated)
+		}
+
+		reqs = reqs[:len(reqs)-1] // no need to include chain reference into results
+	}
+
+	latestFinalized, err := validateBlockResponse(reqs[len(reqs)-1])
 	if err != nil {
 		return nil, err
 	}
-	latestFinalizedBlockNumber := validationBlock.Number
-	if validationReq == latestBlock {
+	latestFinalizedBlockNumber := latestFinalized.Number
+	if finalityValidationReq == latestBlock {
 		// subtract finalityDepth from "latest" to get finalized, when useFinalityTags = false
 		latestFinalizedBlockNumber = mathutil.Max(latestFinalizedBlockNumber-lp.finalityDepth, 0)
 	}
-	if len(reqs) == n+1 {
-		reqs = reqs[:n] // ignore last req if we added it explicitly for validation
-	}
 
-	for k, r := range reqs {
-		if k == validationBlockIndex {
-			// Already validated this one, just insert it in proper place
-			blocks = append(blocks, validationBlock)
-			continue
-		}
+	reqs = reqs[:len(reqs)-1] // no need to include finality validation request into results
 
-		block, err2 := validateBlockResponse(r)
-		if err2 != nil {
-			return nil, err2
+	for i, r := range reqs {
+		block, err := validateBlockResponse(r)
+		if err != nil {
+			return nil, err
 		}
 
 		blockRequested := r.Args[0].(string)
 		if blockRequested != string(latestBlock) && block.Number > latestFinalizedBlockNumber {
 			return nil, fmt.Errorf(
-				"Received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)",
+				"received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)",
 				block.Number, latestFinalizedBlockNumber)
 		}
 
-		blocks = append(blocks, block)
+		blocks[blocksRequested[i]] = block
 	}
 	return blocks, nil
 }
 
-func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) {
-	var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1)
-
+func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []uint64, batchSize int64) (map[uint64]LogPollerBlock, error) {
 	validationReq := finalizedBlock
 	if !lp.useFinalityTag {
 		validationReq = latestBlock
 	}
 
+	chainValidationHead, err := lp.orm.SelectLatestFinalizedBlock(ctx)
+	if err != nil && !errors.Is(err, sql.ErrNoRows) {
+		return nil, fmt.Errorf("failed to fetch latest finalized block from db: %w", err)
+	}
+
+	var logPollerBlocks = make(map[uint64]LogPollerBlock, len(blocksRequested))
 	for i := 0; i < len(blocksRequested); i += int(batchSize) {
 		j := i + int(batchSize)
 		if j > len(blocksRequested) {
 			j = len(blocksRequested)
 		}
-		moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq)
+
+		// As batch requests are not atomic, there is a chance that some of the blocks were replaced due to a reorg once we've observed them.
+		// Example:
+		// 1. RPC's chain is 1,2',3',4',5' (latest finalized is 1).
+		// 2. Batch request reads blocks 1,2'.
+		// 3. RPC updates its state to 1,2,3,4,5 (latest finalized is 5).
+		// 4. Batch request reads 4,5.
+		// As a result, we'll treat block 2' as finalized. To address that, we have to fetch all blocks twice and verify that the results are identical.
+		fetched1, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq, chainValidationHead)
 		if err != nil {
 			return nil, err
 		}
-		blocks = append(blocks, moreBlocks...)
+
+		fetched2, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq, chainValidationHead)
+		if err != nil {
+			return nil, err
+		}
+
+		err = ensureIdenticalBlocksBatches(fetched1, fetched2)
+		if err != nil {
+			return nil, err
+		}
+
+		for _, head := range fetched1 {
+			lpBlock := LogPollerBlock{
+				EvmChainId:           head.EVMChainID,
+				BlockHash:            head.Hash,
+				BlockNumber:          head.Number,
+				BlockTimestamp:       head.Timestamp,
+				FinalizedBlockNumber: head.Number, // always finalized; only matters if this block is returned by LatestBlock()
+				CreatedAt:            head.CreatedAt,
+			}
+			logPollerBlocks[uint64(head.Number)] = lpBlock //nolint:gosec // G115
+			if chainValidationHead == nil || chainValidationHead.BlockNumber < lpBlock.BlockNumber {
+				chainValidationHead = &lpBlock
+			}
+		}
 	}
 
-	return blocks, nil
+	return logPollerBlocks, nil
+}
+
+func ensureIdenticalBlocksBatches(fetched1, fetched2 map[uint64]*evmtypes.Head) error {
+	if len(fetched1) != len(fetched2) {
+		return fmt.Errorf("invariant violation: expected size of batches to be identical. Fetched1: %d, Fetched2: %d", len(fetched1), len(fetched2))
+	}
+
+	for num, head1 := range fetched1 {
+		head2, ok := fetched2[num]
+		if !ok {
+			return fmt.Errorf("invariant violation: expected fetched1 to contain same blocks as fetched2, but %d is missing from fetched2", num)
+		}
+
+		if head1.Hash != head2.Hash {
+			return fmt.Errorf("expected block %d to be finalized but got different hashes %s and %s from RPC: %w", num, head1.Hash, head2.Hash, commontypes.ErrFinalityViolated)
+		}
+	}
+
+	return nil
 }
 
 func validateBlockResponse(r rpc.BatchElem) (*evmtypes.Head, error) {
diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go
index 620bbf14f4..3850ecac0d 100644
--- a/core/chains/evm/logpoller/log_poller_internal_test.go
+++ b/core/chains/evm/logpoller/log_poller_internal_test.go
@@ -22,11 +22,14 @@ import (
 	"github.com/stretchr/testify/require"
 	"go.uber.org/zap/zapcore"
 
+	commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
+
+	"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
+
 	"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
 
 	"github.com/smartcontractkit/chainlink-common/pkg/logger"
 	"github.com/smartcontractkit/chainlink-common/pkg/services"
-	"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
 
 	htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks"
 	evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
@@ -243,7 +246,7 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) {
 		BackupPollerBlockDelay:   0,
 	}
 	lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
-	lp.BackupPollAndSaveLogs(ctx)
+	require.NoError(t, lp.BackupPollAndSaveLogs(ctx))
 	assert.Equal(t, int64(0), lp.backupPollerNextBlock)
 	assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len())
 
@@ -253,11 +256,28 @@ func TestLogPoller_BackupPollerStartup(t *testing.T) {
 	require.NoError(t, err)
 	require.Equal(t, latestBlock, lastProcessed.BlockNumber)
 
-	lp.BackupPollAndSaveLogs(ctx)
+	require.NoError(t, lp.BackupPollAndSaveLogs(ctx))
 	assert.Equal(t, int64(2), lp.backupPollerNextBlock)
 }
 
 func mockBatchCallContext(t *testing.T, ec *evmclimocks.Client) {
+	mockBatchCallContextWithHead(t, ec, newHeadVal)
+}
+
+func newHeadVal(num int64) evmtypes.Head {
+	return evmtypes.Head{
+		Number:     num,
+		Hash:       common.BigToHash(big.NewInt(num)),
+		ParentHash: common.BigToHash(big.NewInt(num - 1)),
+	}
+}
+
+func newHead(num int64) *evmtypes.Head {
+	r := newHeadVal(num)
+	return &r
+}
+
+func mockBatchCallContextWithHead(t *testing.T, ec *evmclimocks.Client, newHead func(num int64) evmtypes.Head) {
 	ec.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Run(func(args mock.Arguments) {
 		elems := args.Get(1).([]rpc.BatchElem)
 		for _, e := range elems {
@@ -274,7 +294,7 @@ func mockBatchCallContext(t *testing.T, ec *evmclimocks.Client) {
 				num = int64(n)
 			}
 			result := e.Result.(*evmtypes.Head)
-			*result = evmtypes.Head{Number: num, Hash: utils.NewHash()}
+			*result = newHead(num)
 		}
 	})
 }
@@ -289,12 +309,12 @@ func TestLogPoller_Replay(t *testing.T) {
 	orm := NewORM(chainID, db, lggr)
 
 	var head atomic.Pointer[evmtypes.Head]
-	head.Store(&evmtypes.Head{Number: 4})
+	head.Store(newHead(4))
 
 	events := []common.Hash{EmitterABI.Events["Log1"].ID}
 	log1 := types.Log{
 		Index:       0,
-		BlockHash:   common.Hash{},
+		BlockHash:   common.BigToHash(big.NewInt(head.Load().Number)),
 		BlockNumber: uint64(head.Load().Number),
 		Topics:      events,
 		Address:     addr,
@@ -303,6 +323,9 @@ func TestLogPoller_Replay(t *testing.T) {
 	}
 
 	ec := evmclimocks.NewClient(t)
+	ec.EXPECT().HeadByHash(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, hash common.Hash) (*evmtypes.Head, error) {
+		return &evmtypes.Head{Number: hash.Big().Int64(), Hash: hash}, nil
+	}).Maybe()
 	ec.On("HeadByNumber", mock.Anything, mock.Anything).Return(func(context.Context, *big.Int) (*evmtypes.Head, error) {
 		return head.Load(), nil
 	})
@@ -321,7 +344,7 @@ func TestLogPoller_Replay(t *testing.T) {
 
 	headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(func(ctx context.Context) (*evmtypes.Head, *evmtypes.Head, error) {
 		h := head.Load()
-		finalized := &evmtypes.Head{Number: h.Number - lpOpts.FinalityDepth}
+		finalized := newHead(h.Number - lpOpts.FinalityDepth)
 		return h, finalized, nil
 	})
 	lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
@@ -423,7 +446,7 @@ func TestLogPoller_Replay(t *testing.T) {
 
 		ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil).Maybe() // in case task gets delayed by >= 100ms
 
-		head.Store(&evmtypes.Head{Number: 5})
+		head.Store(newHead(5))
 		t.Cleanup(lp.reset)
 		servicetest.Run(t, lp)
 
@@ -450,7 +473,7 @@ func TestLogPoller_Replay(t *testing.T) {
 			go func() {
 				defer close(done)
 
-				head.Store(&evmtypes.Head{Number: 4}) // Restore latest block to 4, so this matches the fromBlock requested
+				head.Store(newHead(4)) // Restore latest block to 4, so this matches the fromBlock requested
 				select {
 				case lp.replayStart <- 4:
 				case <-ctx.Done():
@@ -471,7 +494,7 @@ func TestLogPoller_Replay(t *testing.T) {
 		ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)
 
 		t.Cleanup(lp.reset)
-		head.Store(&evmtypes.Head{Number: 5}) // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs()
+		head.Store(newHead(6)) // Latest block must be > lastProcessed in order for SaveAndPollLogs() to call FilterLogs()
 		servicetest.Run(t, lp)
 
 		select {
@@ -492,7 +515,7 @@ func TestLogPoller_Replay(t *testing.T) {
 
 		lp.ReplayAsync(1)
 
-		recvStartReplay(testutils.Context(t), 2)
+		recvStartReplay(testutils.Context(t), 4)
 	})
 
 	t.Run("ReplayAsync error", func(t *testing.T) {
@@ -532,7 +555,7 @@ func TestLogPoller_Replay(t *testing.T) {
 		require.NoError(t, err)
 
 		h := head.Load()
-		err = lp.orm.InsertBlock(ctx, h.Hash, h.Number, h.Timestamp, h.Number)
+		err = lp.orm.InsertBlock(ctx, common.BigToHash(big.NewInt(h.Number)), h.Number, h.Timestamp, h.Number)
 		require.NoError(t, err)
 
 		ec.On("FilterLogs", mock.Anything, mock.Anything).Return([]types.Log{log1}, nil)
@@ -603,25 +626,35 @@ func Test_FetchBlocks(t *testing.T) {
 
 	cases := []struct {
 		name            string
-		blocksRequested []string
+		blocksRequested []uint64
+		chainReference  *LogPollerBlock
 		expectedErr     error
-	}{{
-		"successful validation including finalized and latest",
-		[]string{"0x3", "latest", "0x5", "finalized", "0x1"},
-		nil,
-	}, {
-		"successful validation with all block numbers",
-		[]string{"0x2", "0x5", "0x3", "0x4"},
-		nil,
-	}, {
-		"finality violation including finalized and latest",
-		[]string{"0x8", "0x2", "latest", "finalized"},
-		errors.New("Received unfinalized block 8 while expecting finalized block (latestFinalizedBlockNumber = 5)"),
-	}, {
-		"finality violation with all block numbers",
-		[]string{"0x9", "0x2", "finalized", "latest"},
-		errors.New("Received unfinalized block 9 while expecting finalized block (latestFinalizedBlockNumber = 5)"),
-	}}
+	}{
+		{
+			"All blocks are finalized from RPC's perspective, no reference",
+			[]uint64{2, 5, 3, 4},
+			nil,
+			nil,
+		},
+		{
+			"RPC's latest finalized is lower than request, no reference",
+			[]uint64{8, 2},
+			nil,
+			errors.New("received unfinalized block 8 while expecting finalized block (latestFinalizedBlockNumber = 5)"),
+		},
+		{
+			"All blocks are finalized, but chain reference does not match",
+			[]uint64{2, 5, 3, 4},
+			&LogPollerBlock{BlockNumber: 1, BlockHash: common.BigToHash(big.NewInt(2))},
+			errors.New("expected RPC's finalized block hash at hegiht 1 to be 0x0000000000000000000000000000000000000000000000000000000000000002 but got 0x0000000000000000000000000000000000000000000000000000000000000001: finality violated"),
+		},
+		{
+			"All blocks are finalized and chain reference matches",
+			[]uint64{2, 5, 3, 4},
+			&LogPollerBlock{BlockNumber: 1, BlockHash: common.BigToHash(big.NewInt(1))},
+			nil,
+		},
+	}
 
 	lp := NewLogPoller(orm, ec, lggr, nil, lpOpts)
 	for _, tc := range cases {
@@ -631,29 +664,111 @@ func Test_FetchBlocks(t *testing.T) {
 				blockValidationReq = finalizedBlock
 			}
 			t.Run(fmt.Sprintf("%s where useFinalityTag=%t", tc.name, lp.useFinalityTag), func(t *testing.T) {
-				blocks, err := lp.fetchBlocks(ctx, tc.blocksRequested, blockValidationReq)
+				blocks, err := lp.fetchBlocks(ctx, tc.blocksRequested, blockValidationReq, tc.chainReference)
 				if tc.expectedErr != nil {
 					require.Equal(t, err.Error(), tc.expectedErr.Error())
 					return // PASS
 				}
 				require.NoError(t, err)
-				for i, blockRequested := range tc.blocksRequested {
-					switch blockRequested {
-					case string(latestBlock):
-						assert.Equal(t, int64(8), blocks[i].Number)
-					case string(finalizedBlock):
-						assert.Equal(t, int64(5), blocks[i].Number)
-					default:
-						blockNum, err2 := hexutil.DecodeUint64(blockRequested)
-						require.NoError(t, err2)
-						assert.Equal(t, int64(blockNum), blocks[i].Number)
-					}
+				for _, blockRequested := range tc.blocksRequested {
+					assert.Equal(t, blockRequested, uint64(blocks[blockRequested].Number)) //nolint:gosec // G115
 				}
 			})
 		}
 	}
 }
 
+func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
+	t.Parallel()
+
+	db := pgtest.NewSqlxDB(t)
+	lpOpts := Opts{
+		PollPeriod:               time.Second,
+		FinalityDepth:            3,
+		BackfillBatchSize:        3,
+		RpcBatchSize:             3,
+		KeepFinalizedBlocksDepth: 20,
+		BackupPollerBlockDelay:   0,
+	}
+	t.Run("Finalized DB block is not present in RPC's chain", func(t *testing.T) {
+		lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel)
+		orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr)
+		headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
+		finalized := newHead(5)
+		latest := newHead(16)
+		headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).RunAndReturn(func(ctx context.Context) (*evmtypes.Head, *evmtypes.Head, error) {
+			return latest, finalized, nil
+		}).Once()
+		ec := evmclimocks.NewClient(t)
+		ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) {
+			return newHead(number.Int64()), nil
+		})
+		ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5}}, nil).Once()
+		mockBatchCallContext(t, ec)
+		// insert finalized block with different hash than in RPC
+		require.NoError(t, orm.InsertBlock(tests.Context(t), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2))
+		lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
+		lp.PollAndSaveLogs(tests.Context(t), 4)
+		require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
+	})
+	t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) {
+		lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel)
+		orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr)
+		headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
+		finalized := newHead(5)
+		latest := newHead(16)
+		headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once()
+		ec := evmclimocks.NewClient(t)
+		ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) {
+			return newHead(number.Int64()), nil
+		})
+		ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5}}, nil).Once()
+		mockBatchCallContextWithHead(t, ec, func(num int64) evmtypes.Head {
+			// return new hash for every call
+			return evmtypes.Head{Number: num, Hash: utils.NewHash()}
+		})
+		lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
+		lp.PollAndSaveLogs(tests.Context(t), 4)
+		require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
+	})
+	t.Run("Log's hash does not match block's", func(t *testing.T) {
+		lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel)
+		orm := NewORM(testutils.NewRandomEVMChainID(), db, lggr)
+		headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
+		finalized := newHead(5)
+		latest := newHead(16)
+		headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once()
+		ec := evmclimocks.NewClient(t)
+		ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) {
+			return newHead(number.Int64()), nil
+		})
+		ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once()
+		mockBatchCallContext(t, ec)
+		lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
+		lp.PollAndSaveLogs(tests.Context(t), 4)
+		require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
+	})
+	t.Run("Happy path", func(t *testing.T) {
+		lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel)
+		chainID := testutils.NewRandomEVMChainID()
+		orm := NewORM(chainID, db, lggr)
+		headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t)
+		finalized := newHead(5)
+		latest := newHead(16)
+		headTracker.EXPECT().LatestAndFinalizedBlock(mock.Anything).Return(latest, finalized, nil).Once()
+		ec := evmclimocks.NewClient(t)
+		ec.EXPECT().ConfiguredChainID().Return(chainID)
+		ec.EXPECT().HeadByNumber(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, number *big.Int) (*evmtypes.Head, error) {
+			return newHead(number.Int64()), nil
+		})
+		ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once()
+		mockBatchCallContext(t, ec)
+		lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
+		lp.PollAndSaveLogs(tests.Context(t), 4)
+		require.NoError(t, lp.HealthReport()[lp.Name()])
+	})
+}
+
 func benchmarkFilter(b *testing.B, nFilters, nAddresses, nEvents int) {
 	lggr := logger.Test(b)
 	lpOpts := Opts{
diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go
index 1ab548063a..b969e4ba78 100644
--- a/core/chains/evm/logpoller/log_poller_test.go
+++ b/core/chains/evm/logpoller/log_poller_test.go
@@ -1350,7 +1350,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
 	blockNums = []uint64{2}
 	_, err = th.LogPoller.GetBlocksRange(testutils.Context(t), blockNums)
 	require.Error(t, err)
-	assert.Equal(t, "Received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error())
+	assert.Equal(t, "received unfinalized block 2 while expecting finalized block (latestFinalizedBlockNumber = 1)", err.Error())
 
 	th.Client.Commit() // Commit block #4, so that block #2 is finalized
 
@@ -1419,12 +1419,6 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) {
 	_, err = th.LogPoller.GetBlocksRange(ctx, blockNums)
 	require.Error(t, err)
 	assert.Contains(t, err.Error(), "context canceled")
-
-	// test canceled ctx
-	ctx, cancel = context.WithCancel(testutils.Context(t))
-	cancel()
-	_, err = th.LogPoller.GetBlocksRange(ctx, blockNums)
-	require.Equal(t, err, context.Canceled)
 }
 
 func TestGetReplayFromBlock(t *testing.T) {
@@ -1679,11 +1673,11 @@ func TestTooManyLogResults(t *testing.T) {
 		crit := obs.FilterLevelExact(zapcore.DPanicLevel).All()
 		errors := obs.FilterLevelExact(zapcore.ErrorLevel).All()
 		warns := obs.FilterLevelExact(zapcore.WarnLevel).All()
-		assert.Len(t, crit, 0)
-		require.Len(t, errors, 1)
-		assert.Equal(t, errors[0].Message, "Unable to query for logs")
-		require.Len(t, warns, 1)
-		assert.Contains(t, warns[0].Message, "retrying later")
+		assert.Empty(t, crit)
+		require.Len(t, errors, 2)
+		assert.Contains(t, errors[0].Message, "Unable to query for logs")
+		assert.Contains(t, errors[1].Message, "Failed to poll and save logs, retrying later")
+		require.Empty(t, warns)
 	})
 }
 
diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go
index 0b5a8f4bd4..9761b27dc6 100644
--- a/core/chains/evm/logpoller/orm.go
+++ b/core/chains/evm/logpoller/orm.go
@@ -43,6 +43,7 @@ type ORM interface {
 	SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error)
 	SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
 	SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error)
+	SelectLatestFinalizedBlock(ctx context.Context) (*LogPollerBlock, error)
 
 	SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)
 	SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error)
@@ -989,6 +990,18 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim
 	return logs, nil
 }
 
+func (o *DSORM) SelectLatestFinalizedBlock(ctx context.Context) (*LogPollerBlock, error) {
+	var b LogPollerBlock
+	if err := o.ds.GetContext(ctx, &b,
+		blocksQuery(`WHERE evm_chain_id = $1 AND block_number <= (
+			SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1
+		) ORDER BY block_number DESC LIMIT 1`), ubig.New(o.chainID),
+	); err != nil {
+		return nil, err
+	}
+	return &b, nil
+}
+
 func nestedBlockNumberQuery(confs evmtypes.Confirmations) string {
 	if confs == evmtypes.Finalized {
 		return `
diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go
index ba66e166eb..5c0b1e42fe 100644
--- a/core/chains/evm/logpoller/orm_test.go
+++ b/core/chains/evm/logpoller/orm_test.go
@@ -2168,3 +2168,35 @@ func TestSelectOldestBlock(t *testing.T) {
 		require.Equal(t, block.BlockHash, common.HexToHash("0x1233"))
 	})
 }
+
+func TestSelectLatestFinalizedBlock(t *testing.T) {
+	t.Run("If finalized block is not present in DB return error", func(t *testing.T) {
+		th := SetupTH(t, lpOpts)
+		o1 := th.ORM
+		o2 := th.ORM2
+		ctx := testutils.Context(t)
+		// o2's chain does not have finalized block
+		require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1231"), 11, time.Now(), 9))
+		require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1234"), 10, time.Now(), 8))
+		// o1 has finalized blocks
+		require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 11, time.Now(), 10))
+		require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 10, time.Now(), 10))
+		result, err := o2.SelectLatestFinalizedBlock(ctx)
+		require.ErrorIs(t, err, sql.ErrNoRows)
+		require.Nil(t, result)
+	})
+	t.Run("Returns latest finalized block even if there is no exact match by block number", func(t *testing.T) {
+		th := SetupTH(t, lpOpts)
+		o1 := th.ORM
+		ctx := testutils.Context(t)
+		require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 12, time.Now(), 10))
+		require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 11, time.Now(), 9))
+		require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1231"), 5, time.Now(), 4))
+		require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1230"), 4, time.Now(), 4))
+		result, err := o1.SelectLatestFinalizedBlock(ctx)
+		require.NoError(t, err)
+		require.NotNil(t, result)
+		require.Equal(t, int64(5), result.BlockNumber)
+		require.Equal(t, common.HexToHash("0x1231"), result.BlockHash)
+	})
+}

From 7b4e2e97effbe12395b3cc39c67aea4d1daaf657 Mon Sep 17 00:00:00 2001
From: Dmytro Haidashenko <dmytro.haidashenko@smartcontract.com>
Date: Thu, 16 Jan 2025 13:38:13 +0100
Subject: [PATCH 2/2] fixes for legacy version

---
 core/chains/evm/logpoller/log_poller.go       | 28 +++++++++----------
 .../evm/logpoller/log_poller_internal_test.go |  6 ++--
 core/chains/evm/logpoller/orm.go              |  4 +--
 3 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go
index 8b2c3bfdef..0fcead7f13 100644
--- a/core/chains/evm/logpoller/log_poller.go
+++ b/core/chains/evm/logpoller/log_poller.go
@@ -407,7 +407,7 @@ func (lp *logPoller) Replay(ctx context.Context, fromBlock int64) (err error) {
 	defer func() {
 		if errors.Is(err, context.Canceled) {
 			err = ErrReplayRequestAborted
-		} else if errors.Is(err, commontypes.ErrFinalityViolated) {
+		} else if errors.Is(err, ErrFinalityViolated) {
 			// Replay only declares finality violation and does not resolve it, as it's possible that [fromBlock, savedFinalizedBlockNumber]
 			// does not contain the violation.
 			lp.lggr.Criticalw("Replay failed due to finality violation", "fromBlock", fromBlock, "err", err)
@@ -633,7 +633,7 @@ func (lp *logPoller) run() {
 			}
 			err := lp.BackupPollAndSaveLogs(ctx)
 			switch {
-			case errors.Is(err, commontypes.ErrFinalityViolated):
+			case errors.Is(err, ErrFinalityViolated):
 				// BackupPoll only declares finality violation and does not resolve it, as it's possible that processed range
 				// does not contain the violation.
 				lp.lggr.Criticalw("Backup poll failed due to finality violation", "err", err)
@@ -816,7 +816,7 @@ func (lp *logPoller) blocksFromFinalizedLogs(ctx context.Context, logs []types.L
 
 	for i, log := range logs {
 		if log.BlockHash != blocks[i].BlockHash {
-			return nil, fmt.Errorf("finalized log produced by tx %s has block hash %s that does not match fetched block's hash %s: %w", log.TxHash, log.BlockHash, blocks[i].BlockHash, commontypes.ErrFinalityViolated)
+			return nil, fmt.Errorf("finalized log produced by tx %s has block hash %s that does not match fetched block's hash %s: %w", log.TxHash, log.BlockHash, blocks[i].BlockHash, ErrFinalityViolated)
 		}
 	}
 
@@ -949,7 +949,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
 // conditions this would be equal to lastProcessed.BlockNumber + 1.
 func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) {
 	err := lp.pollAndSaveLogs(ctx, currentBlockNumber)
-	if errors.Is(err, commontypes.ErrFinalityViolated) {
+	if errors.Is(err, ErrFinalityViolated) {
 		lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err)
 		lp.finalityViolated.Store(true)
 		lp.SvcErrBuffer.Append(err)
@@ -1107,7 +1107,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
 	}
 
 	lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
-	return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", commontypes.ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number)
+	return nil, fmt.Errorf("%w: finalized block hash %s does not match RPC's %s at height %d", ErrFinalityViolated, ourParentBlockHash, blockAfterLCA.Hash, blockAfterLCA.Number)
 }
 
 // PruneOldBlocks removes blocks that are > lp.keepFinalizedBlocksDepth behind the latest finalized block.
@@ -1320,13 +1320,13 @@ func (lp *logPoller) fillRemainingBlocksFromRPC(
 
 	logPollerBlocks := make(map[uint64]LogPollerBlock)
 	for _, head := range evmBlocks {
-		logPollerBlocks[uint64(head.Number)] = LogPollerBlock{
-			EvmChainId:           head.EVMChainID,
-			BlockHash:            head.Hash,
-			BlockNumber:          head.Number,
-			BlockTimestamp:       head.Timestamp,
-			FinalizedBlockNumber: head.Number, // always finalized; only matters if this block is returned by LatestBlock()
-			CreatedAt:            head.Timestamp,
+		logPollerBlocks[uint64(head.BlockNumber)] = LogPollerBlock{
+			EvmChainId:           head.EvmChainId,
+			BlockHash:            head.BlockHash,
+			BlockNumber:          head.BlockNumber,
+			BlockTimestamp:       head.BlockTimestamp,
+			FinalizedBlockNumber: head.BlockNumber, // always finalized; only matters if this block is returned by LatestBlock()
+			CreatedAt:            head.BlockTimestamp,
 		}
 	}
 	return logPollerBlocks, nil
@@ -1382,7 +1382,7 @@ func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []uint64,
 
 		if rpcChainReference.Hash != chainReference.BlockHash {
 			return nil, fmt.Errorf("expected RPC's finalized block hash at hegiht %d to be %s but got %s: %w",
-				chainReference.BlockNumber, chainReference.BlockHash, rpcChainReference.Hash, commontypes.ErrFinalityViolated)
+				chainReference.BlockNumber, chainReference.BlockHash, rpcChainReference.Hash, ErrFinalityViolated)
 		}
 
 		reqs = reqs[:len(reqs)-1] // no need to include chain reference into results
@@ -1489,7 +1489,7 @@ func ensureIdenticalBlocksBatches(fetched1, fetched2 map[uint64]*evmtypes.Head)
 		}
 
 		if head1.Hash != head2.Hash {
-			return fmt.Errorf("expected block %d to be finalized but got different hashes %s and %s from RPC: %w", num, head1.Hash, head2.Hash, commontypes.ErrFinalityViolated)
+			return fmt.Errorf("expected block %d to be finalized but got different hashes %s and %s from RPC: %w", num, head1.Hash, head2.Hash, ErrFinalityViolated)
 		}
 	}
 
diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go
index 3850ecac0d..58ee52b8a3 100644
--- a/core/chains/evm/logpoller/log_poller_internal_test.go
+++ b/core/chains/evm/logpoller/log_poller_internal_test.go
@@ -709,7 +709,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
 		require.NoError(t, orm.InsertBlock(tests.Context(t), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2))
 		lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
 		lp.PollAndSaveLogs(tests.Context(t), 4)
-		require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
+		require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated)
 	})
 	t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) {
 		lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel)
@@ -729,7 +729,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
 		})
 		lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
 		lp.PollAndSaveLogs(tests.Context(t), 4)
-		require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
+		require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated)
 	})
 	t.Run("Log's hash does not match block's", func(t *testing.T) {
 		lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel)
@@ -746,7 +746,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
 		mockBatchCallContext(t, ec)
 		lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
 		lp.PollAndSaveLogs(tests.Context(t), 4)
-		require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
+		require.ErrorIs(t, lp.HealthReport()[lp.Name()], ErrFinalityViolated)
 	})
 	t.Run("Happy path", func(t *testing.T) {
 		lggr, _ := logger.TestObserved(t, zapcore.ErrorLevel)
diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go
index 9761b27dc6..20ed4f5951 100644
--- a/core/chains/evm/logpoller/orm.go
+++ b/core/chains/evm/logpoller/orm.go
@@ -993,9 +993,9 @@ func (o *DSORM) FilteredLogs(ctx context.Context, filter []query.Expression, lim
 func (o *DSORM) SelectLatestFinalizedBlock(ctx context.Context) (*LogPollerBlock, error) {
 	var b LogPollerBlock
 	if err := o.ds.GetContext(ctx, &b,
-		blocksQuery(`WHERE evm_chain_id = $1 AND block_number <= (
+		`SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number <= (
 			SELECT finalized_block_number FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1
-		) ORDER BY block_number DESC LIMIT 1`), ubig.New(o.chainID),
+		) ORDER BY block_number DESC LIMIT 1`, ubig.New(o.chainID),
 	); err != nil {
 		return nil, err
 	}