From a4e72132199db847699862e3927996242c5a004c Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Fri, 26 Jan 2024 17:18:31 +0800 Subject: [PATCH] fix(op-node): pre-fetching handle L1 reOrg (#115) Co-authored-by: Welkin --- op-node/sources/caching/pre_fetch_cache.go | 3 +- op-node/sources/l1_client.go | 44 +++++- op-node/sources/l1_client_test.go | 167 +++++++++++++++++++++ 3 files changed, 208 insertions(+), 6 deletions(-) create mode 100644 op-node/sources/l1_client_test.go diff --git a/op-node/sources/caching/pre_fetch_cache.go b/op-node/sources/caching/pre_fetch_cache.go index 19842b1a66c7..fa24a612f941 100644 --- a/op-node/sources/caching/pre_fetch_cache.go +++ b/op-node/sources/caching/pre_fetch_cache.go @@ -43,7 +43,8 @@ func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFu defer v.lock.Unlock() v.lock.Lock() if _, ok := v.inner[key]; ok { - return false, false + v.inner[key] = value + return true, false } if v.queue.Size() >= v.maxSize { return false, true diff --git a/op-node/sources/l1_client.go b/op-node/sources/l1_client.go index 5c8bc95cd760..108704bcc142 100644 --- a/op-node/sources/l1_client.go +++ b/op-node/sources/l1_client.go @@ -48,6 +48,8 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide } } +const sequencerConfDepth = 15 + // L1Client provides typed bindings to retrieve L1 data from an RPC source, // with optimized batch requests, cached results, and flag to not trust the RPC // (i.e. to verify all returned contents against corresponding block hashes). @@ -139,6 +141,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 s.log.Info("pre-fetching receipts start", "startBlock", l1Start) go func() { var currentL1Block uint64 + var parentHash *common.Hash for { select { case <-s.done: @@ -147,6 +150,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 case currentL1Block = <-s.preFetchReceiptsStartBlockChan: s.log.Debug("pre-fetching receipts currentL1Block changed", "block", currentL1Block) s.receiptsCache.RemoveAll() + parentHash = nil default: blockRef, err := s.L1BlockRefByLabel(ctx, eth.Unsafe) if err != nil { @@ -169,6 +173,9 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 taskCount = int(blockRef.Number-currentL1Block) + 1 } + blockInfoChan := make(chan eth.L1BlockRef, taskCount) + oldestFetchBlockNumber := currentL1Block + var wg sync.WaitGroup for i := 0; i < taskCount; i++ { wg.Add(1) @@ -179,15 +186,17 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 case <-s.done: return default: - if _, ok := s.receiptsCache.Get(blockNumber); ok { - return - } + pair, ok := s.receiptsCache.Get(blockNumber) blockInfo, err := s.L1BlockRefByNumber(ctx, blockNumber) if err != nil { s.log.Debug("failed to fetch block ref", "err", err, "blockNumber", blockNumber) time.Sleep(1 * time.Second) continue } + if ok && pair.blockHash == blockInfo.Hash { + return + } + isSuccess, err := s.PreFetchReceipts(ctx, blockInfo.Hash) if err != nil { s.log.Warn("failed to pre-fetch receipts", "err", err) @@ -198,14 +207,39 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 time.Sleep(1 * time.Second) continue } - s.log.Debug("pre-fetching receipts done", "block", blockInfo.Number) - break + s.log.Debug("pre-fetching receipts done", "block", blockInfo.Number, "hash", blockInfo.Hash) + blockInfoChan <- blockInfo + return } } }(ctx, currentL1Block) currentL1Block = currentL1Block + 1 } wg.Wait() + close(blockInfoChan) + + //try to find out l1 reOrg and return to an earlier block height for re-prefetching + var latestBlockHash common.Hash + latestBlockNumber := uint64(0) + var oldestBlockParentHash common.Hash + for l1BlockInfo := range blockInfoChan { + if l1BlockInfo.Number > latestBlockNumber { + latestBlockHash = l1BlockInfo.Hash + latestBlockNumber = l1BlockInfo.Number + } + if l1BlockInfo.Number == oldestFetchBlockNumber { + oldestBlockParentHash = l1BlockInfo.ParentHash + } + } + + s.log.Debug("pre-fetching receipts hash", "latestBlockHash", latestBlockHash, "latestBlockNumber", latestBlockNumber, "oldestBlockNumber", oldestFetchBlockNumber, "oldestBlockParentHash", oldestBlockParentHash) + if parentHash != nil && oldestBlockParentHash != (common.Hash{}) && oldestBlockParentHash != *parentHash && currentL1Block >= sequencerConfDepth+uint64(taskCount) { + currentL1Block = currentL1Block - sequencerConfDepth - uint64(taskCount) + s.log.Warn("pre-fetching receipts found l1 reOrg, return to an earlier block height for re-prefetching", "recordParentHash", *parentHash, "unsafeParentHash", oldestBlockParentHash, "number", oldestFetchBlockNumber, "backToNumber", currentL1Block) + parentHash = nil + continue + } + parentHash = &latestBlockHash } } }() diff --git a/op-node/sources/l1_client_test.go b/op-node/sources/l1_client_test.go new file mode 100644 index 000000000000..b5e7fd3c7bc0 --- /dev/null +++ b/op-node/sources/l1_client_test.go @@ -0,0 +1,167 @@ +package sources + +import ( + "context" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/testlog" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestGoOrUpdatePreFetchReceipts(t *testing.T) { + t.Run("handleReOrg", func(t *testing.T) { + m := new(mockRPC) + ctx := context.Background() + clientLog := testlog.Logger(t, log.LvlDebug) + latestHead := &rpcHeader{ + ParentHash: randHash(), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: 100, + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{"latest", false}).Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = latestHead + }).Return([]error{nil}) + for i := 81; i <= 90; i++ { + currentHead := &rpcHeader{ + ParentHash: randHash(), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: hexutil.Uint64(i), + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + currentBlock := &rpcBlock{ + rpcHeader: *currentHead, + Transactions: []*types.Transaction{}, + } + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = currentHead + }).Return([]error{nil}) + m.On("CallContext", ctx, new(*rpcBlock), + "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcBlock) = currentBlock + }).Return([]error{nil}) + } + for i := 91; i <= 100; i++ { + currentHead := &rpcHeader{ + ParentHash: randHash(), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: hexutil.Uint64(i), + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = currentHead + }).Return([]error{nil}) + currentBlock := &rpcBlock{ + rpcHeader: *currentHead, + Transactions: []*types.Transaction{}, + } + m.On("CallContext", ctx, new(*rpcBlock), + "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcBlock) = currentBlock + }).Return([]error{nil}) + } + var lastParentHeader common.Hash + var real100Hash common.Hash + for i := 76; i <= 100; i++ { + currentHead := &rpcHeader{ + ParentHash: lastParentHeader, + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: hexutil.Uint64(i), + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + if i == 100 { + real100Hash = currentHead.Hash + } + lastParentHeader = currentHead.Hash + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = currentHead + }).Return([]error{nil}) + currentBlock := &rpcBlock{ + rpcHeader: *currentHead, + Transactions: []*types.Transaction{}, + } + m.On("CallContext", ctx, new(*rpcBlock), + "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcBlock) = currentBlock + }).Return([]error{nil}) + } + s, err := NewL1Client(m, clientLog, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 1000}, true, RPCKindBasic)) + require.NoError(t, err) + err2 := s.GoOrUpdatePreFetchReceipts(ctx, 81) + require.NoError(t, err2) + time.Sleep(1 * time.Second) + pair, ok := s.receiptsCache.Get(100) + require.True(t, ok, "100 cache miss") + require.Equal(t, real100Hash, pair.blockHash, "block 100 hash is different,want:%s,but:%s", real100Hash, pair.blockHash) + _, ok2 := s.receiptsCache.Get(76) + require.True(t, ok2, "76 cache miss") + }) +}