Skip to content

Commit

Permalink
feat(op-node): pre-fetch block info and transaction (ethereum-optimis…
Browse files Browse the repository at this point in the history
…m#163)

Co-authored-by: Welkin <[email protected]>
  • Loading branch information
welkin22 and Welkin authored Mar 19, 2024
1 parent 190c53c commit eb9e0c4
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 16 deletions.
12 changes: 12 additions & 0 deletions op-service/sources/caching/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ func (c *LRUCache[K, V]) Get(key K) (value V, ok bool) {
return value, ok
}

func (c *LRUCache[K,V]) GetOrPeek(key K, usePeek bool, recordMetrics bool) (value V, ok bool) {
if usePeek {
value, ok = c.inner.Peek(key)
} else {
value, ok = c.inner.Get(key)
}
if c.m != nil && recordMetrics {
c.m.CacheGet(c.label, ok)
}
return value, ok
}

func (c *LRUCache[K, V]) Add(key K, value V) (evicted bool) {
evicted = c.inner.Add(key, value)
if c.m != nil {
Expand Down
4 changes: 2 additions & 2 deletions op-service/sources/caching/pre_fetch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFu
return true, false
}

func (v *PreFetchCache[V]) Get(key uint64) (V, bool) {
func (v *PreFetchCache[V]) Get(key uint64, recordMetrics bool) (V, bool) {
defer v.lock.Unlock()
v.lock.Lock()
value, ok := v.inner[key]
if v.m != nil {
if v.m != nil && recordMetrics {
v.m.CacheGet(v.label, ok)
}
return value, ok
Expand Down
21 changes: 15 additions & 6 deletions op-service/sources/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ type EthClient struct {

// [OPTIONAL] The reth DB path to fetch receipts from
rethDbPath string

// isReadOrderly Indicates whether the client often reads data in order of block height.
// If so, the process of reading the cache will be different to ensure a high cache hit rate.
isReadOrderly bool
}

func (s *EthClient) PickReceiptsMethod(txCount uint64) ReceiptsFetchingMethod {
Expand Down Expand Up @@ -166,7 +170,7 @@ func (s *EthClient) OnReceiptsMethodErr(m ReceiptsFetchingMethod, err error) {

// NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging,
// metric tracking, and caching. The [EthClient] uses a [LimitRPC] wrapper to limit the number of concurrent RPC requests.
func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig) (*EthClient, error) {
func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, config *EthClientConfig, isReadOrderly bool) (*EthClient, error) {
if err := config.Check(); err != nil {
return nil, fmt.Errorf("bad config, cannot create L1 source: %w", err)
}
Expand All @@ -186,6 +190,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
lastMethodsReset: time.Now(),
methodResetDuration: config.MethodResetDuration,
rethDbPath: config.RethDBPath,
isReadOrderly: isReadOrderly,
}, nil
}

Expand Down Expand Up @@ -298,7 +303,7 @@ func (s *EthClient) ChainID(ctx context.Context) (*big.Int, error) {
}

func (s *EthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) {
if header, ok := s.headersCache.Get(hash); ok {
if header, ok := s.headersCache.GetOrPeek(hash, s.isReadOrderly, true); ok {
return header, nil
}
return s.headerCall(ctx, "eth_getBlockByHash", hashID(hash))
Expand All @@ -323,8 +328,12 @@ func (s *EthClient) BSCInfoByLabel(ctx context.Context, label eth.BlockLabel) (e
}

func (s *EthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) {
if header, ok := s.headersCache.Get(hash); ok {
if txs, ok := s.transactionsCache.Get(hash); ok {
return s.infoAndTxsByHash(ctx, hash, true)
}

func (s *EthClient) infoAndTxsByHash(ctx context.Context, hash common.Hash, recordMetrics bool) (eth.BlockInfo, types.Transactions, error) {
if header, ok := s.headersCache.GetOrPeek(hash, s.isReadOrderly, recordMetrics); ok {
if txs, ok := s.transactionsCache.GetOrPeek(hash, s.isReadOrderly, recordMetrics); ok {
return header, txs, nil
}
}
Expand Down Expand Up @@ -380,7 +389,7 @@ func (s *EthClient) PreFetchReceipts(ctx context.Context, blockHash common.Hash)
}

func (s *EthClient) fetchReceiptsInner(ctx context.Context, blockHash common.Hash, isForPreFetch bool) (eth.BlockInfo, types.Receipts, error, bool) {
info, txs, err := s.InfoAndTxsByHash(ctx, blockHash)
info, txs, err := s.infoAndTxsByHash(ctx, blockHash, !isForPreFetch)
if err != nil {
return nil, nil, err, false
}
Expand All @@ -389,7 +398,7 @@ func (s *EthClient) fetchReceiptsInner(ctx context.Context, blockHash common.Has
// The underlying fetcher uses the receipts hash to verify receipt integrity.
var job *receiptsFetchingJob
var isFull bool
v, ok := s.receiptsCache.Get(info.NumberU64())
v, ok := s.receiptsCache.Get(info.NumberU64(), !isForPreFetch)
if ok && v.blockHash == blockHash {
job = v.job
} else {
Expand Down
2 changes: 1 addition & 1 deletion op-service/sources/eth_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestEthClient_InfoByHash(t *testing.T) {
"eth_getBlockByHash", []any{rhdr.Hash, false}).Run(func(args mock.Arguments) {
*args[1].(**rpcHeader) = rhdr
}).Return([]error{nil})
s, err := NewEthClient(m, nil, nil, testEthClientConfig)
s, err := NewEthClient(m, nil, nil, testEthClientConfig, false)
require.NoError(t, err)
info, err := s.InfoByHash(ctx, rhdr.Hash)
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions op-service/sources/l1_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type L1Client struct {

// NewL1Client wraps a RPC with bindings to fetch L1 data, while logging errors, tracking metrics (optional), and caching.
func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, config *L1ClientConfig) (*L1Client, error) {
ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig)
ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -187,13 +187,13 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
case <-s.done:
return
default:
pair, ok := s.receiptsCache.Get(blockNumber)
blockInfo, err := s.L1BlockRefByNumber(ctx, blockNumber)
if err != nil {
s.log.Debug("failed to fetch block ref", "err", err, "blockNumber", blockNumber)
time.Sleep(1 * time.Second)
continue
}
pair, ok := s.receiptsCache.Get(blockNumber, false)
if ok && pair.blockHash == blockInfo.Hash {
blockInfoChan <- blockInfo
return
Expand Down Expand Up @@ -251,6 +251,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6
}

func (s *L1Client) ClearReceiptsCacheBefore(blockNumber uint64) {
s.log.Debug("clear receipts cache before", "blockNumber", blockNumber)
s.receiptsCache.RemoveLessThan(blockNumber)
}

Expand Down
4 changes: 2 additions & 2 deletions op-service/sources/l1_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ func TestGoOrUpdatePreFetchReceipts(t *testing.T) {
err2 := s.GoOrUpdatePreFetchReceipts(ctx, 81)
require.NoError(t, err2)
time.Sleep(1 * time.Second)
pair, ok := s.receiptsCache.Get(100)
pair, ok := s.receiptsCache.Get(100, false)
require.True(t, ok, "100 cache miss")
require.Equal(t, real100Hash, pair.blockHash, "block 100 hash is different,want:%s,but:%s", real100Hash, pair.blockHash)
_, ok2 := s.receiptsCache.Get(76)
_, ok2 := s.receiptsCache.Get(76, false)
require.True(t, ok2, "76 cache miss")
})
}
2 changes: 1 addition & 1 deletion op-service/sources/l2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type L2Client struct {
// for fetching and caching eth.L2BlockRef values. This includes fetching an L2BlockRef by block number, label, or hash.
// See: [L2BlockRefByLabel], [L2BlockRefByNumber], [L2BlockRefByHash]
func NewL2Client(client client.RPC, log log.Logger, metrics caching.Metrics, config *L2ClientConfig) (*L2Client, error) {
ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig)
ethClient, err := NewEthClient(client, log, metrics, &config.EthClientConfig, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion op-service/sources/receipts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (tc *ReceiptsTestCase) Run(t *testing.T) {
testCfg.MethodResetDuration = 0
}
logger := testlog.Logger(t, log.LvlError)
ethCl, err := NewEthClient(client.NewBaseRPCClient(cl), logger, nil, testCfg)
ethCl, err := NewEthClient(client.NewBaseRPCClient(cl), logger, nil, testCfg, true)
require.NoError(t, err)
defer ethCl.Close()

Expand Down
2 changes: 1 addition & 1 deletion ops-bedrock/Dockerfile.l2
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM --platform=linux/amd64 ghcr.io/bnb-chain/op-geth:latest
FROM --platform=linux/amd64 ghcr.io/bnb-chain/op-geth:develop

RUN apk add --no-cache jq

Expand Down

0 comments on commit eb9e0c4

Please sign in to comment.