From eb9e0c40c39d24c40957fcccde7be5e9f0f7122c Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Tue, 19 Mar 2024 15:24:22 +0800 Subject: [PATCH] feat(op-node): pre-fetch block info and transaction (#163) Co-authored-by: Welkin --- op-service/sources/caching/cache.go | 12 +++++++++++ op-service/sources/caching/pre_fetch_cache.go | 4 ++-- op-service/sources/eth_client.go | 21 +++++++++++++------ op-service/sources/eth_client_test.go | 2 +- op-service/sources/l1_client.go | 5 +++-- op-service/sources/l1_client_test.go | 4 ++-- op-service/sources/l2_client.go | 2 +- op-service/sources/receipts_test.go | 2 +- ops-bedrock/Dockerfile.l2 | 2 +- 9 files changed, 38 insertions(+), 16 deletions(-) diff --git a/op-service/sources/caching/cache.go b/op-service/sources/caching/cache.go index 3863a523997c..02cffa51dd14 100644 --- a/op-service/sources/caching/cache.go +++ b/op-service/sources/caching/cache.go @@ -22,6 +22,18 @@ func (c *LRUCache[K, V]) Get(key K) (value V, ok bool) { return value, ok } +func (c *LRUCache[K,V]) GetOrPeek(key K, usePeek bool, recordMetrics bool) (value V, ok bool) { + if usePeek { + value, ok = c.inner.Peek(key) + } else { + value, ok = c.inner.Get(key) + } + if c.m != nil && recordMetrics { + c.m.CacheGet(c.label, ok) + } + return value, ok +} + func (c *LRUCache[K, V]) Add(key K, value V) (evicted bool) { evicted = c.inner.Add(key, value) if c.m != nil { diff --git a/op-service/sources/caching/pre_fetch_cache.go b/op-service/sources/caching/pre_fetch_cache.go index fa24a612f941..ad928fe1fea7 100644 --- a/op-service/sources/caching/pre_fetch_cache.go +++ b/op-service/sources/caching/pre_fetch_cache.go @@ -57,11 +57,11 @@ func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFu return true, false } -func (v *PreFetchCache[V]) Get(key uint64) (V, bool) { +func (v *PreFetchCache[V]) Get(key uint64, recordMetrics bool) (V, bool) { defer v.lock.Unlock() v.lock.Lock() value, ok := v.inner[key] - if v.m != nil { + if v.m != nil && recordMetrics { v.m.CacheGet(v.label, ok) } return value, ok diff --git a/op-service/sources/eth_client.go b/op-service/sources/eth_client.go index a9307f955a48..e1d749ad0324 100644 --- a/op-service/sources/eth_client.go +++ b/op-service/sources/eth_client.go @@ -138,6 +138,10 @@ type EthClient struct { // [OPTIONAL] The reth DB path to fetch receipts from rethDbPath string + + // isReadOrderly Indicates whether the client often reads data in order of block height. + // If so, the process of reading the cache will be different to ensure a high cache hit rate. + isReadOrderly bool } func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod { @@ -166,7 +170,7 @@ func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) { // NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging, // metric tracking, and caching. The [EthClient] uses a [LimitRPC] wrapper to limit the number of concurrent RPC requests. -func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) (*EthClient, error) { +func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig, isReadOrderly bool) (*EthClient, error) { if err := config.Check(); err != nil { return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err) } @@ -186,6 +190,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co lastMethodsReset: time.Now(), methodResetDuration: config.MethodResetDuration, rethDbPath: config.RethDBPath, + isReadOrderly: isReadOrderly, }, nil } @@ -298,7 +303,7 @@ func (s *EthClient) ChainID(ctx context.Context) (*big.Int, error) { } func (s *EthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) { - if header, ok := s.headersCache.Get(hash); ok { + if header, ok := s.headersCache.GetOrPeek(hash, s.isReadOrderly, true); ok { return header, nil } return s.headerCall(ctx, "eth_getBlockByHash", hashID(hash)) @@ -323,8 +328,12 @@ func (s *EthClient) BSCInfoByLabel(ctx context.Context, label eth.BlockLabel) (e } func (s *EthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) { - if header, ok := s.headersCache.Get(hash); ok { - if txs, ok := s.transactionsCache.Get(hash); ok { + return s.infoAndTxsByHash(ctx, hash, true) +} + +func (s *EthClient) infoAndTxsByHash(ctx context.Context, hash common.Hash, recordMetrics bool) (eth.BlockInfo, types.Transactions, error) { + if header, ok := s.headersCache.GetOrPeek(hash, s.isReadOrderly, recordMetrics); ok { + if txs, ok := s.transactionsCache.GetOrPeek(hash, s.isReadOrderly, recordMetrics); ok { return header, txs, nil } } @@ -380,7 +389,7 @@ func (s *EthClient) PreFetchReceipts(ctx context.Context, blockHash common.Hash) } func (s *EthClient) fetchReceiptsInner(ctx context.Context, blockHash common.Hash, isForPreFetch bool) (eth.BlockInfo, types.Receipts, error, bool) { - info, txs, err := s.InfoAndTxsByHash(ctx, blockHash) + info, txs, err := s.infoAndTxsByHash(ctx, blockHash, !isForPreFetch) if err != nil { return nil, nil, err, false } @@ -389,7 +398,7 @@ func (s *EthClient) fetchReceiptsInner(ctx context.Context, blockHash common.Has // The underlying fetcher uses the receipts hash to verify receipt integrity. var job *receiptsFetchingJob var isFull bool - v, ok := s.receiptsCache.Get(info.NumberU64()) + v, ok := s.receiptsCache.Get(info.NumberU64(), !isForPreFetch) if ok && v.blockHash == blockHash { job = v.job } else { diff --git a/op-service/sources/eth_client_test.go b/op-service/sources/eth_client_test.go index 11447f8a2e51..13749fbc70fb 100644 --- a/op-service/sources/eth_client_test.go +++ b/op-service/sources/eth_client_test.go @@ -110,7 +110,7 @@ func TestEthClient_InfoByHash(t *testing.T) { "eth_getBlockByHash", []any{rhdr.Hash, false}).Run(func(args mock.Arguments) { *args[1].(**rpcHeader) = rhdr }).Return([]error{nil}) - s, err := NewEthClient(m, nil, nil, testEthClientConfig) + s, err := NewEthClient(m, nil, nil, testEthClientConfig, false) require.NoError(t, err) info, err := s.InfoByHash(ctx, rhdr.Hash) require.NoError(t, err) diff --git a/op-service/sources/l1_client.go b/op-service/sources/l1_client.go index 2a025c3e5261..75e963e2ff2b 100644 --- a/op-service/sources/l1_client.go +++ b/op-service/sources/l1_client.go @@ -73,7 +73,7 @@ type L1Client struct { // NewL1Client wraps a RPC with bindings to fetch L1 data, while logging errors, tracking metrics (optional), and caching. func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, config *L1ClientConfig) (*L1Client, error) { - ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig) + ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig, true) if err != nil { return nil, err } @@ -187,13 +187,13 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 case <-s.done: return default: - pair, ok := s.receiptsCache.Get(blockNumber) blockInfo, err := s.L1BlockRefByNumber(ctx, blockNumber) if err != nil { s.log.Debug("failed to fetch block ref", "err", err, "blockNumber", blockNumber) time.Sleep(1 * time.Second) continue } + pair, ok := s.receiptsCache.Get(blockNumber, false) if ok && pair.blockHash == blockInfo.Hash { blockInfoChan <- blockInfo return @@ -251,6 +251,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 } func (s *L1Client) ClearReceiptsCacheBefore(blockNumber uint64) { + s.log.Debug("clear receipts cache before", "blockNumber", blockNumber) s.receiptsCache.RemoveLessThan(blockNumber) } diff --git a/op-service/sources/l1_client_test.go b/op-service/sources/l1_client_test.go index cf1a28006de9..496bd9a14e99 100644 --- a/op-service/sources/l1_client_test.go +++ b/op-service/sources/l1_client_test.go @@ -158,10 +158,10 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) { err2 := s.GoOrUpdatePreFetchReceipts(ctx, 81) require.NoError(t, err2) time.Sleep(1 * time.Second) - pair, ok := s.receiptsCache.Get(100) + pair, ok := s.receiptsCache.Get(100, false) require.True(t, ok, "100 cache miss") require.Equal(t, real100Hash, pair.blockHash, "block 100 hash is different,want:%s,but:%s", real100Hash, pair.blockHash) - _, ok2 := s.receiptsCache.Get(76) + _, ok2 := s.receiptsCache.Get(76, false) require.True(t, ok2, "76 cache miss") }) } diff --git a/op-service/sources/l2_client.go b/op-service/sources/l2_client.go index 5d108d3ca338..202fe06357f9 100644 --- a/op-service/sources/l2_client.go +++ b/op-service/sources/l2_client.go @@ -79,7 +79,7 @@ type L2Client struct { // for fetching and caching eth.L2BlockRef values. This includes fetching an L2BlockRef by block number, label, or hash. // See: [L2BlockRefByLabel], [L2BlockRefByNumber], [L2BlockRefByHash] func NewL2Client(client client.RPC, log log.Logger, metrics caching.Metrics, config *L2ClientConfig) (*L2Client, error) { - ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig) + ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig, false) if err != nil { return nil, err } diff --git a/op-service/sources/receipts_test.go b/op-service/sources/receipts_test.go index 1861f2363161..fe79c136c096 100644 --- a/op-service/sources/receipts_test.go +++ b/op-service/sources/receipts_test.go @@ -162,7 +162,7 @@ func (tc *ReceiptsTestCase) Run(t *testing.T) { testCfg.MethodResetDuration = 0 } logger := testlog.Logger(t, log.LvlError) - ethCl, err := NewEthClient(client.NewBaseRPCClient(cl), logger, nil, testCfg) + ethCl, err := NewEthClient(client.NewBaseRPCClient(cl), logger, nil, testCfg, true) require.NoError(t, err) defer ethCl.Close() diff --git a/ops-bedrock/Dockerfile.l2 b/ops-bedrock/Dockerfile.l2 index 1d8a84fed489..424346fc59c7 100644 --- a/ops-bedrock/Dockerfile.l2 +++ b/ops-bedrock/Dockerfile.l2 @@ -1,4 +1,4 @@ -FROM --platform=linux/amd64 ghcr.io/bnb-chain/op-geth:latest +FROM --platform=linux/amd64 ghcr.io/bnb-chain/op-geth:develop RUN apk add --no-cache jq