From 7947c25316340ac0adaf6685759c265b541bd676 Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Thu, 21 Dec 2023 14:46:13 +0800 Subject: [PATCH 01/11] feature(op-node): pre-fetch receipts concurrently (#100) * feature(op-node): concurrent pre-fetch receipts * use background ctx in GoOrUpdatePreFetchReceipts * change MaxConcurrentRequests from 10 to 20 --------- Co-authored-by: Welkin --- op-node/rollup/derive/engine_queue.go | 2 +- op-node/sources/l1_client.go | 23 +++++++++++++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 1fe213d8430d..36208a807c59 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -787,7 +787,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System if err != nil { return NewTemporaryError(fmt.Errorf("failed to fetch L1 config of L2 block %s: %w", pipelineL2.ID(), err)) } - err2 := eq.l1Fetcher.GoOrUpdatePreFetchReceipts(ctx, pipelineOrigin.Number) + err2 := eq.l1Fetcher.GoOrUpdatePreFetchReceipts(context.Background(), pipelineOrigin.Number) if err2 != nil { return NewTemporaryError(fmt.Errorf("failed to run pre fetch L1 receipts for L1 start block %s: %w", pipelineOrigin.ID(), err2)) } diff --git a/op-node/sources/l1_client.go b/op-node/sources/l1_client.go index 522b77a20614..e746b80d844f 100644 --- a/op-node/sources/l1_client.go +++ b/op-node/sources/l1_client.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "golang.org/x/time/rate" "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -38,7 +39,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide HeadersCacheSize: span, PayloadsCacheSize: span, MaxRequestsPerBatch: 20, // TODO: tune batch param - MaxConcurrentRequests: 10, + MaxConcurrentRequests: 20, TrustRPC: trustRPC, MustBePostMerge: false, RPCProviderKind: kind, @@ -61,6 +62,8 @@ type L1Client struct { //ensure pre-fetch receipts only once preFetchReceiptsOnce sync.Once + //control the number of concurrent pre-fetch receipt requests. + preFetchReceiptsRateLimiter *rate.Limiter //start block for pre-fetch receipts preFetchReceiptsStartBlockChan chan uint64 //done chan @@ -79,6 +82,7 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con l1BlockRefsCache: caching.NewLRUCache(metrics, "blockrefs", config.L1BlockRefsCacheSize), preFetchReceiptsOnce: sync.Once{}, preFetchReceiptsStartBlockChan: make(chan uint64, 1), + preFetchReceiptsRateLimiter: rate.NewLimiter(rate.Limit(config.MaxConcurrentRequests/2), config.MaxConcurrentRequests/2), done: make(chan struct{}), }, nil } @@ -151,13 +155,20 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 time.Sleep(3 * time.Second) continue } - _, _, err = s.FetchReceipts(ctx, blockInfo.Hash) - if err != nil { - s.log.Warn("failed to pre-fetch receipts", "err", err) - time.Sleep(200 * time.Millisecond) + waitErr := s.preFetchReceiptsRateLimiter.Wait(ctx) + if waitErr != nil { + s.log.Warn("failed to wait pre-fetch receipts rateLimiter", "err", waitErr) continue } - s.log.Debug("pre-fetching receipts", "block", currentL1Block) + + go func(ctx context.Context, blockInfo eth.L1BlockRef) { + _, _, err = s.FetchReceipts(ctx, blockInfo.Hash) + if err != nil { + s.log.Warn("failed to pre-fetch receipts", "err", err) + return + } + s.log.Debug("pre-fetching receipts", "block", currentL1Block) + }(ctx, blockInfo) currentL1Block = currentL1Block + 1 } } From 5f592b52afd6f6ef723964bebcc53d8b371efb8e Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Tue, 2 Jan 2024 16:04:05 +0800 Subject: [PATCH 02/11] opt(op-node): refactor sequencer step schedule (#101) --- op-node/rollup/driver/sequencer.go | 11 ++++++++ op-node/rollup/driver/state.go | 42 +++++++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/op-node/rollup/driver/sequencer.go b/op-node/rollup/driver/sequencer.go index 3d5b30aa0b03..992706e98ad8 100644 --- a/op-node/rollup/driver/sequencer.go +++ b/op-node/rollup/driver/sequencer.go @@ -45,6 +45,9 @@ type Sequencer struct { timeNow func() time.Time nextAction time.Time + + // if accEmptyBlocks>10, will delay nextAction 600ms for full block building + accEmptyBlocks int } func NewSequencer(log log.Logger, cfg *rollup.Config, engine derive.ResettableEngineControl, attributesBuilder derive.AttributesBuilder, l1OriginSelector L1OriginSelectorIface, metrics SequencerMetrics) *Sequencer { @@ -227,6 +230,9 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP return nil, nil } else { d.attrBuilder.CachePayloadByHash(payload) + if len(payload.Transactions) == 1 { + d.accEmptyBlocks += 1 + } d.log.Info("sequencer successfully built a new block", "block", payload.ID(), "time", uint64(payload.Timestamp), "txs", len(payload.Transactions)) return payload, nil } @@ -249,6 +255,11 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP } } else { parent, buildingID, _ := d.engine.BuildingPayload() // we should have a new payload ID now that we're building a block + if d.accEmptyBlocks > 10 { + d.nextAction = d.timeNow().Add(600 * time.Millisecond) + d.accEmptyBlocks = 0 + d.log.Info("sequencer delay next action 600ms and reset accEmptyBlocks") + } d.log.Info("sequencer started building new block", "payload_id", buildingID, "l2_parent_block", parent, "l2_parent_block_time", parent.Time) } return nil, nil diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index 0afb47bb1793..b384b233ad68 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -269,7 +269,24 @@ func (s *Driver) eventLoop() { // This may adjust at any time based on fork-choice changes or previous errors. // // update sequencer time if the head changed - planSequencerAction() + delay := s.sequencer.PlanNextSequencerAction() + if delay == 0 { + select { + case newL1Head := <-s.l1HeadSig: // sequencerStep may depend on this when FindL1Origin + s.l1State.HandleNewL1HeadBlock(newL1Head) + reqStep() // a new L1 head may mean we have the data to not get an EOF again. + continue + default: + // immediately do sequencerStep if time is ready + if err := sequencerStep(); err != nil { + return + } + // sequencerStep was already done, so we continue to next round + continue + } + } else { + planSequencerAction() + } } } else { sequencerCh = nil @@ -282,8 +299,8 @@ func (s *Driver) eventLoop() { altSyncTicker.Reset(syncCheckInterval) } + // Following cases are high priority if s.driverConfig.SequencerPriority { - // help sequencerStep not interrupt by other steps select { case <-sequencerCh: if err := sequencerStep(); err != nil { @@ -294,6 +311,25 @@ func (s *Driver) eventLoop() { s.l1State.HandleNewL1HeadBlock(newL1Head) reqStep() // a new L1 head may mean we have the data to not get an EOF again. continue + case respCh := <-s.stopSequencer: + if s.driverConfig.SequencerStopped { + respCh <- hashAndError{err: errors.New("sequencer not running")} + } else { + if err := s.sequencerNotifs.SequencerStopped(); err != nil { + respCh <- hashAndError{err: fmt.Errorf("sequencer start notification: %w", err)} + continue + } + s.log.Warn("Sequencer has been stopped") + s.driverConfig.SequencerStopped = true + respCh <- hashAndError{hash: s.derivation.UnsafeL2Head().Hash} + } + continue + case respCh := <-s.sequencerActive: + respCh <- !s.driverConfig.SequencerStopped + continue + case respCh := <-s.stateReq: + respCh <- struct{}{} + continue default: } } @@ -334,7 +370,7 @@ func (s *Driver) eventLoop() { s.metrics.SetDerivationIdle(false) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) stepCtx := context.Background() - if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped { + if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped && s.driverConfig.SequencerPriority { var cancelStep context.CancelFunc stepCtx, cancelStep = context.WithTimeout(ctx, 3*time.Second) defer cancelStep() From 387eb6ce6691ac7acbb76dadd7c65b52753d074f Mon Sep 17 00:00:00 2001 From: krish-z <122767080+krish-nr@users.noreply.github.com> Date: Tue, 2 Jan 2024 16:37:29 +0800 Subject: [PATCH 03/11] optimize: extended expire time for sequencer block broadcasting (#106) * config: change block validator payload from 60s to 20 min * config: increase seenMessagesTTL AND blockHeightLRU to avoid old block validation --- op-node/p2p/gossip.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/op-node/p2p/gossip.go b/op-node/p2p/gossip.go index 36694156a372..c0b7d5aa5ae8 100644 --- a/op-node/p2p/gossip.go +++ b/op-node/p2p/gossip.go @@ -34,8 +34,8 @@ const ( globalValidateThrottle = 512 gossipHeartbeat = 500 * time.Millisecond // seenMessagesTTL limits the duration that message IDs are remembered for gossip deduplication purposes - // 130 * gossipHeartbeat - seenMessagesTTL = 130 * gossipHeartbeat + // 2500 * gossipHeartbeat + seenMessagesTTL = 2500 * gossipHeartbeat DefaultMeshD = 8 // topic stable mesh target count DefaultMeshDlo = 6 // topic stable mesh low watermark DefaultMeshDhi = 12 // topic stable mesh high watermark @@ -242,7 +242,7 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti // Seen block hashes per block height // uint64 -> *seenBlocks - blockHeightLRU, err := lru.New(1000) + blockHeightLRU, err := lru.New(1500) if err != nil { panic(fmt.Errorf("failed to set up block height LRU cache: %w", err)) } @@ -291,8 +291,8 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti // rounding down to seconds is fine here. now := uint64(time.Now().Unix()) - // [REJECT] if the `payload.timestamp` is older than 60 seconds in the past - if uint64(payload.Timestamp) < now-60 { + // [REJECT] if the `payload.timestamp` is older than 20 min in the past + if uint64(payload.Timestamp) < now-1200 { log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp)) return pubsub.ValidationReject } From e8d466b38493bb5a4cb9978efbfc7d0a1cba80ad Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Tue, 2 Jan 2024 19:31:10 +0800 Subject: [PATCH 04/11] feat(op-node): pre-fetch receipts concurrently round 2 (#104) * remove rateLimiter for prefetch and use waitGroup * PreFetchReceipts logic * add lock * miss continue * add preFetchCache * fix unit test * fix unit test * fix unit test * fix e2e case * rollback lru cache * rollback lru cache * make code simple * maxConcurrentRequests/2 --------- Co-authored-by: Welkin --- op-node/rollup/derive/attributes.go | 2 + op-node/rollup/derive/engine_queue.go | 1 + op-node/rollup/derive/engine_queue_test.go | 17 ++++ op-node/rollup/derive/l1_traversal.go | 1 + op-node/rollup/driver/metered_l1fetcher.go | 10 +++ op-node/rollup/driver/sequencer.go | 1 + op-node/sources/caching/pre_fetch_cache.go | 90 ++++++++++++++++++++++ op-node/sources/eth_client.go | 42 +++++++--- op-node/sources/l1_client.go | 84 ++++++++++++++------ op-node/sources/receipts.go | 5 ++ op-node/testutils/mock_eth_client.go | 5 ++ op-node/testutils/mock_l1.go | 8 ++ op-program/client/l1/client.go | 8 ++ op-program/host/prefetcher/prefetcher.go | 1 + 14 files changed, 244 insertions(+), 31 deletions(-) create mode 100644 op-node/sources/caching/pre_fetch_cache.go diff --git a/op-node/rollup/derive/attributes.go b/op-node/rollup/derive/attributes.go index 3afe755c1fa2..45246d066d2a 100644 --- a/op-node/rollup/derive/attributes.go +++ b/op-node/rollup/derive/attributes.go @@ -20,7 +20,9 @@ type L1ReceiptsFetcher interface { InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) + PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) GoOrUpdatePreFetchReceipts(ctx context.Context, l1StartBlock uint64) error + ClearReceiptsCacheBefore(blockNumber uint64) } type SystemConfigL2Fetcher interface { diff --git a/op-node/rollup/derive/engine_queue.go b/op-node/rollup/derive/engine_queue.go index 36208a807c59..13e104f548b9 100644 --- a/op-node/rollup/derive/engine_queue.go +++ b/op-node/rollup/derive/engine_queue.go @@ -402,6 +402,7 @@ func (eq *EngineQueue) postProcessSafeL2() { eq.log.Debug("updated finality-data", "last_l1", last.L1Block, "last_l2", last.L2Block) } } + eq.l1Fetcher.ClearReceiptsCacheBefore(eq.safeHead.L1Origin.Number) } func (eq *EngineQueue) logSyncProgress(reason string) { diff --git a/op-node/rollup/derive/engine_queue_test.go b/op-node/rollup/derive/engine_queue_test.go index 3a7a8625d700..a7881a25cdba 100644 --- a/op-node/rollup/derive/engine_queue_test.go +++ b/op-node/rollup/derive/engine_queue_test.go @@ -260,12 +260,14 @@ func TestEngineQueue_Finalize(t *testing.T) { eq.origin = refD prev.origin = refD eq.safeHead = refC1 + l1F.ExpectClearReceiptsCacheBefore(refC1.L1Origin.Number) eq.postProcessSafeL2() // now say D0 was included in E and became the new safe head eq.origin = refE prev.origin = refE eq.safeHead = refD0 + l1F.ExpectClearReceiptsCacheBefore(refD0.L1Origin.Number) eq.postProcessSafeL2() // let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E) @@ -703,16 +705,19 @@ func TestVerifyNewL1Origin(t *testing.T) { newOrigin eth.L1BlockRef expectReset bool expectedFetchBlocks map[uint64]eth.L1BlockRef + verifyPass bool }{ { name: "L1OriginBeforeUnsafeOrigin", newOrigin: refD, expectReset: false, + verifyPass: true, }, { name: "Matching", newOrigin: refF, expectReset: false, + verifyPass: true, }, { name: "BlockNumberEqualDifferentHash", @@ -723,11 +728,13 @@ func TestVerifyNewL1Origin(t *testing.T) { Time: refF.Time, }, expectReset: true, + verifyPass: false, }, { name: "UnsafeIsParent", newOrigin: refG, expectReset: false, + verifyPass: true, }, { name: "UnsafeIsParentNumberDifferentHash", @@ -738,6 +745,7 @@ func TestVerifyNewL1Origin(t *testing.T) { Time: refG.Time, }, expectReset: true, + verifyPass: false, }, { name: "UnsafeIsOlderCanonical", @@ -746,6 +754,7 @@ func TestVerifyNewL1Origin(t *testing.T) { expectedFetchBlocks: map[uint64]eth.L1BlockRef{ refF.Number: refF, }, + verifyPass: true, }, { name: "UnsafeIsOlderNonCanonical", @@ -765,6 +774,7 @@ func TestVerifyNewL1Origin(t *testing.T) { Time: refF.Time, }, }, + verifyPass: false, }, } for _, test := range tests { @@ -839,6 +849,9 @@ func TestVerifyNewL1Origin(t *testing.T) { // L1 chain reorgs so new origin is at same slot as refF but on a different fork prev.origin = test.newOrigin eq.UnsafeL2Head() + if test.verifyPass { + l1F.ExpectClearReceiptsCacheBefore(refB.Number) + } err = eq.Step(context.Background()) if test.expectReset { require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin") @@ -938,6 +951,7 @@ func TestBlockBuildingRace(t *testing.T) { // Expect initial forkchoice update eng.ExpectForkchoiceUpdate(preFc, nil, preFcRes, nil) + l1F.ExpectClearReceiptsCacheBefore(refA.Number) require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset") // Expect initial building update, to process the attributes we queued up @@ -1005,6 +1019,7 @@ func TestBlockBuildingRace(t *testing.T) { } eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil) + l1F.ExpectClearReceiptsCacheBefore(refA.Number) // Now complete the job, as external user of the engine _, _, err = eq.ConfirmPayload(context.Background()) require.NoError(t, err) @@ -1093,6 +1108,7 @@ func TestResetLoop(t *testing.T) { eq.engineSyncTarget = refA2 eq.safeHead = refA1 eq.finalized = refA0 + l1F.ExpectClearReceiptsCacheBefore(refA.Number) // Qeueue up the safe attributes require.Nil(t, eq.safeAttributes) @@ -1111,6 +1127,7 @@ func TestResetLoop(t *testing.T) { eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil) require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset") + l1F.ExpectClearReceiptsCacheBefore(refA.Number) // Crux of the test. Should be in a valid state after the reset. require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "Should be able to step after a reset") diff --git a/op-node/rollup/derive/l1_traversal.go b/op-node/rollup/derive/l1_traversal.go index c89b27f5fd6e..6e8af25c97f5 100644 --- a/op-node/rollup/derive/l1_traversal.go +++ b/op-node/rollup/derive/l1_traversal.go @@ -20,6 +20,7 @@ import ( type L1BlockRefByNumberFetcher interface { L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) + PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) } type L1Traversal struct { diff --git a/op-node/rollup/driver/metered_l1fetcher.go b/op-node/rollup/driver/metered_l1fetcher.go index a81511bf7516..eaa76c3c8368 100644 --- a/op-node/rollup/driver/metered_l1fetcher.go +++ b/op-node/rollup/driver/metered_l1fetcher.go @@ -57,6 +57,11 @@ func (m *MeteredL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.H return m.inner.FetchReceipts(ctx, blockHash) } +func (m *MeteredL1Fetcher) PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) { + defer m.recordTime("PreFetchReceipts")() + return m.inner.PreFetchReceipts(ctx, blockHash) +} + var _ derive.L1Fetcher = (*MeteredL1Fetcher)(nil) func (m *MeteredL1Fetcher) recordTime(method string) func() { @@ -71,3 +76,8 @@ func (m *MeteredL1Fetcher) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Sta defer m.recordTime("GoOrUpdatePreFetchReceipts")() return m.inner.GoOrUpdatePreFetchReceipts(ctx, l1StartBlock) } + +func (m *MeteredL1Fetcher) ClearReceiptsCacheBefore(blockNumber uint64) { + defer m.recordTime("ClearReceiptsCacheBefore")() + m.inner.ClearReceiptsCacheBefore(blockNumber) +} diff --git a/op-node/rollup/driver/sequencer.go b/op-node/rollup/driver/sequencer.go index 992706e98ad8..bd1c760bec07 100644 --- a/op-node/rollup/driver/sequencer.go +++ b/op-node/rollup/driver/sequencer.go @@ -18,6 +18,7 @@ import ( type Downloader interface { InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) + PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) } type L1OriginSelectorIface interface { diff --git a/op-node/sources/caching/pre_fetch_cache.go b/op-node/sources/caching/pre_fetch_cache.go new file mode 100644 index 000000000000..19842b1a66c7 --- /dev/null +++ b/op-node/sources/caching/pre_fetch_cache.go @@ -0,0 +1,90 @@ +package caching + +import ( + "sync" + + "github.com/ethereum/go-ethereum/common/prque" +) + +type PreFetchCache[V any] struct { + m Metrics + label string + inner map[uint64]V + queue *prque.Prque[uint64, V] + lock sync.Mutex + maxSize int +} + +func NewPreFetchCache[V any](m Metrics, label string, maxSize int) *PreFetchCache[V] { + return &PreFetchCache[V]{ + m: m, + label: label, + inner: make(map[uint64]V), + queue: prque.New[uint64, V](nil), + maxSize: maxSize, + } +} + +func (v *PreFetchCache[V]) Add(key uint64, value V) bool { + defer v.lock.Unlock() + v.lock.Lock() + if _, ok := v.inner[key]; ok { + return false + } + v.queue.Push(value, -key) + v.inner[key] = value + if v.m != nil { + v.m.CacheAdd(v.label, v.queue.Size(), false) + } + return true +} + +func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFull bool) { + defer v.lock.Unlock() + v.lock.Lock() + if _, ok := v.inner[key]; ok { + return false, false + } + if v.queue.Size() >= v.maxSize { + return false, true + } + v.queue.Push(value, -key) + v.inner[key] = value + if v.m != nil { + v.m.CacheAdd(v.label, v.queue.Size(), false) + } + return true, false +} + +func (v *PreFetchCache[V]) Get(key uint64) (V, bool) { + defer v.lock.Unlock() + v.lock.Lock() + value, ok := v.inner[key] + if v.m != nil { + v.m.CacheGet(v.label, ok) + } + return value, ok +} + +func (v *PreFetchCache[V]) RemoveAll() { + defer v.lock.Unlock() + v.lock.Lock() + v.inner = make(map[uint64]V) + v.queue.Reset() +} + +func (v *PreFetchCache[V]) RemoveLessThan(p uint64) (isRemoved bool) { + defer v.lock.Unlock() + v.lock.Lock() + for !v.queue.Empty() { + _, qKey := v.queue.Peek() + if -qKey < p { + v.queue.Pop() + delete(v.inner, -qKey) + isRemoved = true + continue + } + break + } + return +} diff --git a/op-node/sources/eth_client.go b/op-node/sources/eth_client.go index a93c5bb23d2f..2cdc63473169 100644 --- a/op-node/sources/eth_client.go +++ b/op-node/sources/eth_client.go @@ -105,8 +105,8 @@ type EthClient struct { // cache receipts in bundles per block hash // We cache the receipts fetching job to not lose progress when we have to retry the `Fetch` call - // common.Hash -> *receiptsFetchingJob - receiptsCache *caching.LRUCache + // common.Hash -> *receiptsFetchingJobPair + receiptsCache *caching.PreFetchCache[*receiptsFetchingJobPair] // cache transactions in bundles per block hash // common.Hash -> types.Transactions @@ -172,7 +172,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co mustBePostMerge: config.MustBePostMerge, provKind: config.RPCProviderKind, log: log, - receiptsCache: caching.NewLRUCache(metrics, "receipts", config.ReceiptsCacheSize), + receiptsCache: caching.NewPreFetchCache[*receiptsFetchingJobPair](metrics, "receipts", config.ReceiptsCacheSize), transactionsCache: caching.NewLRUCache(metrics, "txs", config.TransactionsCacheSize), headersCache: caching.NewLRUCache(metrics, "headers", config.HeadersCacheSize), payloadsCache: caching.NewLRUCache(metrics, "payloads", config.PayloadsCacheSize), @@ -357,27 +357,51 @@ func (s *EthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (* // It verifies the receipt hash in the block header against the receipt hash of the fetched receipts // to ensure that the execution engine did not fail to return any receipts. func (s *EthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { + blockInfo, receipts, err, _ := s.fetchReceiptsInner(ctx, blockHash, false) + return blockInfo, receipts, err +} + +func (s *EthClient) fetchReceiptsInner(ctx context.Context, blockHash common.Hash, isForPreFetch bool) (eth.BlockInfo, types.Receipts, error, bool) { info, txs, err := s.InfoAndTxsByHash(ctx, blockHash) if err != nil { - return nil, nil, err + return nil, nil, err, false } // Try to reuse the receipts fetcher because is caches the results of intermediate calls. This means // that if just one of many calls fail, we only retry the failed call rather than all of the calls. // The underlying fetcher uses the receipts hash to verify receipt integrity. var job *receiptsFetchingJob - if v, ok := s.receiptsCache.Get(blockHash); ok { - job = v.(*receiptsFetchingJob) + var isFull bool + v, ok := s.receiptsCache.Get(info.NumberU64()) + if ok && v.blockHash == blockHash { + job = v.job } else { txHashes := eth.TransactionsToHashes(txs) job = NewReceiptsFetchingJob(s, s.client, s.maxBatchSize, eth.ToBlockID(info), info.ReceiptHash(), txHashes) - s.receiptsCache.Add(blockHash, job) + _, isFull = s.receiptsCache.AddIfNotFull(info.NumberU64(), &receiptsFetchingJobPair{ + blockHash: blockHash, + job: job, + }) + if isForPreFetch && isFull { + return nil, nil, nil, true + } } receipts, err := job.Fetch(ctx) if err != nil { - return nil, nil, err + return nil, nil, err, isFull } - return info, receipts, nil + return info, receipts, nil, isFull +} + +func (s *EthClient) PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) { + _, _, err, isFull := s.fetchReceiptsInner(ctx, blockHash, true) + if err != nil { + return false, err + } + if isFull { + return false, nil + } + return true, nil } // GetProof returns an account proof result, with any optional requested storage proofs. diff --git a/op-node/sources/l1_client.go b/op-node/sources/l1_client.go index e746b80d844f..5c8bc95cd760 100644 --- a/op-node/sources/l1_client.go +++ b/op-node/sources/l1_client.go @@ -7,15 +7,13 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "golang.org/x/time/rate" - "github.com/ethereum-optimism/optimism/op-node/client" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/sources/caching" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" ) type L1ClientConfig struct { @@ -62,10 +60,10 @@ type L1Client struct { //ensure pre-fetch receipts only once preFetchReceiptsOnce sync.Once - //control the number of concurrent pre-fetch receipt requests. - preFetchReceiptsRateLimiter *rate.Limiter //start block for pre-fetch receipts preFetchReceiptsStartBlockChan chan uint64 + //max concurrent requests + maxConcurrentRequests int //done chan done chan struct{} } @@ -82,7 +80,7 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con l1BlockRefsCache: caching.NewLRUCache(metrics, "blockrefs", config.L1BlockRefsCacheSize), preFetchReceiptsOnce: sync.Once{}, preFetchReceiptsStartBlockChan: make(chan uint64, 1), - preFetchReceiptsRateLimiter: rate.NewLimiter(rate.Limit(config.MaxConcurrentRequests/2), config.MaxConcurrentRequests/2), + maxConcurrentRequests: config.MaxConcurrentRequests, done: make(chan struct{}), }, nil } @@ -148,28 +146,66 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 return case currentL1Block = <-s.preFetchReceiptsStartBlockChan: s.log.Debug("pre-fetching receipts currentL1Block changed", "block", currentL1Block) + s.receiptsCache.RemoveAll() default: - blockInfo, err := s.L1BlockRefByNumber(ctx, currentL1Block) + blockRef, err := s.L1BlockRefByLabel(ctx, eth.Unsafe) if err != nil { - s.log.Debug("failed to fetch next block info", "err", err) + s.log.Debug("failed to fetch latest block ref", "err", err) time.Sleep(3 * time.Second) continue } - waitErr := s.preFetchReceiptsRateLimiter.Wait(ctx) - if waitErr != nil { - s.log.Warn("failed to wait pre-fetch receipts rateLimiter", "err", waitErr) + + if currentL1Block > blockRef.Number { + s.log.Debug("current block height exceeds the latest block height of l1, will wait for a while.", "currentL1Block", currentL1Block, "l1Latest", blockRef.Number) + time.Sleep(3 * time.Second) continue } - go func(ctx context.Context, blockInfo eth.L1BlockRef) { - _, _, err = s.FetchReceipts(ctx, blockInfo.Hash) - if err != nil { - s.log.Warn("failed to pre-fetch receipts", "err", err) - return - } - s.log.Debug("pre-fetching receipts", "block", currentL1Block) - }(ctx, blockInfo) - currentL1Block = currentL1Block + 1 + var taskCount int + maxConcurrent := s.maxConcurrentRequests / 2 + if blockRef.Number-currentL1Block >= uint64(maxConcurrent) { + taskCount = maxConcurrent + } else { + taskCount = int(blockRef.Number-currentL1Block) + 1 + } + + var wg sync.WaitGroup + for i := 0; i < taskCount; i++ { + wg.Add(1) + go func(ctx context.Context, blockNumber uint64) { + defer wg.Done() + for { + select { + case <-s.done: + return + default: + if _, ok := s.receiptsCache.Get(blockNumber); ok { + return + } + blockInfo, err := s.L1BlockRefByNumber(ctx, blockNumber) + if err != nil { + s.log.Debug("failed to fetch block ref", "err", err, "blockNumber", blockNumber) + time.Sleep(1 * time.Second) + continue + } + isSuccess, err := s.PreFetchReceipts(ctx, blockInfo.Hash) + if err != nil { + s.log.Warn("failed to pre-fetch receipts", "err", err) + return + } + if !isSuccess { + s.log.Debug("pre fetch receipts fail without error,need retry", "blockHash", blockInfo.Hash, "blockNumber", blockNumber) + time.Sleep(1 * time.Second) + continue + } + s.log.Debug("pre-fetching receipts done", "block", blockInfo.Number) + break + } + } + }(ctx, currentL1Block) + currentL1Block = currentL1Block + 1 + } + wg.Wait() } } }() @@ -177,6 +213,10 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 return nil } +func (s *L1Client) ClearReceiptsCacheBefore(blockNumber uint64) { + s.receiptsCache.RemoveLessThan(blockNumber) +} + func (s *L1Client) Close() { close(s.done) s.EthClient.Close() diff --git a/op-node/sources/receipts.go b/op-node/sources/receipts.go index 2f2f7d62a403..603b9d36322e 100644 --- a/op-node/sources/receipts.go +++ b/op-node/sources/receipts.go @@ -359,6 +359,11 @@ type rpcClient interface { BatchCallContext(ctx context.Context, b []rpc.BatchElem) error } +type receiptsFetchingJobPair struct { + blockHash common.Hash + job *receiptsFetchingJob +} + // receiptsFetchingJob runs the receipt fetching for a specific block, // and can re-run and adapt based on the fetching method preferences and errors communicated with the requester. type receiptsFetchingJob struct { diff --git a/op-node/testutils/mock_eth_client.go b/op-node/testutils/mock_eth_client.go index 9615d85b6cf2..3f7cdb52991f 100644 --- a/op-node/testutils/mock_eth_client.go +++ b/op-node/testutils/mock_eth_client.go @@ -101,6 +101,11 @@ func (m *MockEthClient) FetchReceipts(ctx context.Context, blockHash common.Hash return *out[0].(*eth.BlockInfo), out[1].(types.Receipts), *out[2].(*error) } +func (m *MockEthClient) PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) { + out := m.Mock.MethodCalled("PreFetchReceipts", blockHash) + return *out[0].(*bool), *out[1].(*error) +} + func (m *MockEthClient) ExpectFetchReceipts(hash common.Hash, info eth.BlockInfo, receipts types.Receipts, err error) { m.Mock.On("FetchReceipts", hash).Once().Return(&info, receipts, &err) } diff --git a/op-node/testutils/mock_l1.go b/op-node/testutils/mock_l1.go index 746501bf3f1c..a6c8b6d9720e 100644 --- a/op-node/testutils/mock_l1.go +++ b/op-node/testutils/mock_l1.go @@ -46,3 +46,11 @@ func (m *MockL1Source) GoOrUpdatePreFetchReceipts(ctx context.Context, l1StartBl func (m *MockL1Source) ExpectGoOrUpdatePreFetchReceipts(background context.Context, number uint64, err error) { m.Mock.On("GoOrUpdatePreFetchReceipts", background, number).Once().Return(&err) } + +func (m *MockL1Source) ClearReceiptsCacheBefore(blockNumber uint64) { + m.Mock.MethodCalled("ClearReceiptsCacheBefore", blockNumber) +} + +func (m *MockL1Source) ExpectClearReceiptsCacheBefore(blockNumber uint64) { + m.Mock.On("ClearReceiptsCacheBefore", blockNumber).Once() +} diff --git a/op-program/client/l1/client.go b/op-program/client/l1/client.go index 751c39bf99c7..1ddb67094b87 100644 --- a/op-program/client/l1/client.go +++ b/op-program/client/l1/client.go @@ -76,6 +76,10 @@ func (o *OracleL1Client) FetchReceipts(ctx context.Context, blockHash common.Has return info, rcpts, nil } +func (o *OracleL1Client) PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) { + return false, nil +} + func (o *OracleL1Client) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) { info, txs := o.oracle.TransactionsByBlockHash(hash) return info, txs, nil @@ -84,3 +88,7 @@ func (o *OracleL1Client) InfoAndTxsByHash(ctx context.Context, hash common.Hash) func (o *OracleL1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1StartBlock uint64) error { return o.oracle.GoOrUpdatePreFetchReceipts(ctx, l1StartBlock) } + +func (o *OracleL1Client) ClearReceiptsCacheBefore(blockNumber uint64) { + //do nothing +} diff --git a/op-program/host/prefetcher/prefetcher.go b/op-program/host/prefetcher/prefetcher.go index 07f1877161e6..cc54111b9927 100644 --- a/op-program/host/prefetcher/prefetcher.go +++ b/op-program/host/prefetcher/prefetcher.go @@ -23,6 +23,7 @@ type L1Source interface { InfoByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, error) InfoAndTxsByHash(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Transactions, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) + PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) } type L2Source interface { From 40b39d352bc1e51ee50d2c28992f820667e939ca Mon Sep 17 00:00:00 2001 From: Owen <103096885+owen-reorg@users.noreply.github.com> Date: Wed, 3 Jan 2024 10:15:49 +0800 Subject: [PATCH 05/11] doc: update v0.2.4 changelog (#107) * doc: update v0.2.4 changelog * update --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 063882cc0b56..987b25b342c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ This is a minor release and upgrading is optional. - #87: optimize(op-node): make block produce stable when L1 latency unstable - #89: feat(op-node): add opBNB bootnodes - #94: fix(op-node/op-batcher): fallbackClient should ignore ethereum.NotFound error +- #100: feature(op-node): pre-fetch receipts concurrently +- #101: optimize(op-node): continue optimizing sequencer step schedule +- #104: feat(op-node): pre-fetch receipts concurrently round 2 +- #106: optimize: extended expire time for sequencer block broadcasting ### Docker Images From 635e668cf0cd82f2596e7373942a968d310f3213 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Fri, 5 Jan 2024 12:26:52 +0800 Subject: [PATCH 06/11] optimize(op-node): increase catching up speed when sequencer lagging (#108) * optimize(op-node): increase catching up speed when sequencer lagging * fix op-node ci --- op-node/rollup/driver/origin_selector.go | 1 + op-node/rollup/driver/sequencer.go | 24 +++++++++++++++++++++--- op-node/rollup/driver/sequencer_test.go | 3 ++- 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/op-node/rollup/driver/origin_selector.go b/op-node/rollup/driver/origin_selector.go index 6704b6d0abdc..ad0d16d49a55 100644 --- a/op-node/rollup/driver/origin_selector.go +++ b/op-node/rollup/driver/origin_selector.go @@ -81,6 +81,7 @@ func (los *L1OriginSelector) FindL1Origin(ctx context.Context, l2Head eth.L2Bloc _, _, err = los.l1.FetchReceipts(receiptsCtx, nextOrigin.Hash) if err != nil { receiptsCached = false + log.Warn("Fetch receipts cache missed when sequencer building block") } // If the next L2 block time is greater than the next origin block's time, we can choose to diff --git a/op-node/rollup/driver/sequencer.go b/op-node/rollup/driver/sequencer.go index bd1c760bec07..44e3a3d0bbaa 100644 --- a/op-node/rollup/driver/sequencer.go +++ b/op-node/rollup/driver/sequencer.go @@ -15,6 +15,12 @@ import ( "github.com/ethereum-optimism/optimism/op-node/rollup/derive" ) +// When block produce is interrupted by high L1 latency, sequencer will build a full block periodically to avoid chain stuck +const buildFullBlockInterval = 20 + +// When block produce is lagging exceed lagTimeWindow, sequencer will set attrs.NoTxPool to true to quickly catch up +const lagTimeWindow = 2 * time.Minute + type Downloader interface { InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) @@ -47,7 +53,7 @@ type Sequencer struct { nextAction time.Time - // if accEmptyBlocks>10, will delay nextAction 600ms for full block building + // if accEmptyBlocks > buildFullBlockInterval, will delay nextAction 600ms for full block building accEmptyBlocks int } @@ -93,7 +99,19 @@ func (d *Sequencer) StartBuildingBlock(ctx context.Context) error { // empty blocks (other than the L1 info deposit and any user deposits). We handle this by // setting NoTxPool to true, which will cause the Sequencer to not include any transactions // from the transaction pool. - attrs.NoTxPool = uint64(attrs.Timestamp) > l1Origin.Time+d.config.MaxSequencerDrift + if uint64(attrs.Timestamp) > l1Origin.Time+d.config.MaxSequencerDrift { + attrs.NoTxPool = true + } else { + // This is short term solution to increase sequencer catching up speed. + // Long term solution should optimize op-geth payload building work flow. + attrsTime := time.Unix(int64(attrs.Timestamp), 0) + isCatchingUp := time.Since(attrsTime) > lagTimeWindow + if isCatchingUp && (d.accEmptyBlocks < buildFullBlockInterval) { + attrs.NoTxPool = true + } else { + attrs.NoTxPool = false + } + } d.log.Debug("prepared attributes for new block", "num", l2Head.Number+1, "time", uint64(attrs.Timestamp), @@ -256,7 +274,7 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context) (*eth.ExecutionP } } else { parent, buildingID, _ := d.engine.BuildingPayload() // we should have a new payload ID now that we're building a block - if d.accEmptyBlocks > 10 { + if d.accEmptyBlocks >= buildFullBlockInterval { d.nextAction = d.timeNow().Add(600 * time.Millisecond) d.accEmptyBlocks = 0 d.log.Info("sequencer delay next action 600ms and reset accEmptyBlocks") diff --git a/op-node/rollup/driver/sequencer_test.go b/op-node/rollup/driver/sequencer_test.go index 6bc0293e5ef9..73a243357bb1 100644 --- a/op-node/rollup/driver/sequencer_test.go +++ b/op-node/rollup/driver/sequencer_test.go @@ -378,5 +378,6 @@ func TestSequencerChaosMonkey(t *testing.T) { require.Less(t, l2Head.Time-l1Times[l2Head.L1Origin], uint64(100), "The L1 origin time is close to the L2 time") require.Less(t, clockTime.Sub(time.Unix(int64(l2Head.Time), 0)).Abs(), 2*time.Second, "L2 time is accurate, within 2 seconds of wallclock") require.Greater(t, engControl.avgBuildingTime(), time.Second, "With 2 second block time and 1 second error backoff and healthy-on-average errors, building time should at least be a second") - require.Greater(t, engControl.avgTxsPerBlock(), 3.0, "We expect at least 1 system tx per block, but with a mocked 0-10 txs we expect an higher avg") + // sequencer catching up optimization will reduce the avgTxs per block, but still should be greater than 1 + require.Greater(t, engControl.avgTxsPerBlock(), 1.0, "We expect at least 1 system tx per block, but with a mocked 0-10 txs we expect an higher avg") } From ffec6c812b9ff1783bda2cf2bd3ae3ad53cdd05e Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Tue, 9 Jan 2024 10:54:50 +0800 Subject: [PATCH 07/11] feat(op-batcher/op-proposer): add InstrumentedClient (#109) * feat(op-batcher/op-proposer): add InstrumentedClient for batcher/proposer/txmgr * fix * fix noopMetric npe --------- Co-authored-by: Welkin --- op-batcher/batcher/config.go | 3 +- op-batcher/batcher/driver.go | 2 + op-batcher/metrics/metrics.go | 70 +++++++++++ op-batcher/metrics/noop.go | 5 + op-proposer/metrics/metrics.go | 69 ++++++++++ op-proposer/metrics/noop.go | 5 + op-proposer/proposer/l2_output_submitter.go | 1 + op-service/client/ethclient.go | 8 +- op-service/client/fallback_client.go | 8 ++ op-service/client/metrics.go | 133 ++++++++++++++++++++ op-service/txmgr/cli.go | 4 +- op-service/txmgr/metrics/noop.go | 4 + op-service/txmgr/metrics/tx_metrics.go | 90 +++++++++++-- 13 files changed, 385 insertions(+), 17 deletions(-) create mode 100644 op-service/client/metrics.go diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 7ca9ef0f1304..b4c1bd178372 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -5,7 +5,6 @@ import ( "github.com/ethereum-optimism/optimism/op-service/client" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" "github.com/urfave/cli" @@ -25,7 +24,7 @@ type Config struct { log log.Logger metr metrics.Metricer L1Client client.EthClient - L2Client *ethclient.Client + L2Client client.EthClient RollupNode *sources.RollupClient TxManager txmgr.TxManager diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 5bbd884cff15..774fd4fe71c0 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -57,11 +57,13 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metri if err != nil { return nil, err } + l1Client = opclient.NewInstrumentedClient(l1Client, m) l2Client, err := opclient.DialEthClientWithTimeout(ctx, cfg.L2EthRpc, opclient.DefaultDialTimeout) if err != nil { return nil, err } + l2Client = opclient.NewInstrumentedClient(l2Client, m) rollupClient, err := opclient.DialRollupClientWithTimeout(ctx, cfg.RollupRpc, opclient.DefaultDialTimeout) if err != nil { diff --git a/op-batcher/metrics/metrics.go b/op-batcher/metrics/metrics.go index 466432a01a86..c450bf399348 100644 --- a/op-batcher/metrics/metrics.go +++ b/op-batcher/metrics/metrics.go @@ -2,8 +2,12 @@ package metrics import ( "context" + "errors" + "fmt" + "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -17,6 +21,7 @@ import ( ) const Namespace = "op_batcher" +const RPCClientSubsystem = "rpc_client" type Metricer interface { RecordInfo(version string) @@ -43,6 +48,7 @@ type Metricer interface { RecordBatchTxFailed() Document() []opmetrics.DocumentedMetric + client.Metricer } type Metrics struct { @@ -74,6 +80,10 @@ type Metrics struct { channelOutputBytesTotal prometheus.Counter batcherTxEvs opmetrics.EventVec + + RPCClientRequestsTotal *prometheus.CounterVec + RPCClientRequestDurationSeconds *prometheus.HistogramVec + RPCClientResponsesTotal *prometheus.CounterVec } var _ Metricer = (*Metrics)(nil) @@ -174,6 +184,33 @@ func NewMetrics(procName string) *Metrics { }), batcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}), + + RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: RPCClientSubsystem, + Name: "requests_total", + Help: "Total RPC requests initiated by the op-batcher's RPC client", + }, []string{ + "method", + }), + RPCClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: ns, + Subsystem: RPCClientSubsystem, + Name: "request_duration_seconds", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + Help: "Histogram of RPC client request durations", + }, []string{ + "method", + }), + RPCClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: RPCClientSubsystem, + Name: "responses_total", + Help: "Total RPC request responses received by the op-batcher's RPC client", + }, []string{ + "method", + "error", + }), } } @@ -296,6 +333,39 @@ func (m *Metrics) RecordBatchTxFailed() { m.batcherTxEvs.Record(TxStageFailed) } +func (m *Metrics) RecordRPCClientRequest(method string) func(err error) { + m.RPCClientRequestsTotal.WithLabelValues(method).Inc() + timer := prometheus.NewTimer(m.RPCClientRequestDurationSeconds.WithLabelValues(method)) + return func(err error) { + m.RecordRPCClientResponse(method, err) + timer.ObserveDuration() + } +} + +// RecordRPCClientResponse records an RPC response. It will +// convert the passed-in error into something metrics friendly. +// Nil errors get converted into , RPC errors are converted +// into rpc_, HTTP errors are converted into +// http_, and everything else is converted into +// . +func (m *Metrics) RecordRPCClientResponse(method string, err error) { + var errStr string + var rpcErr rpc.Error + var httpErr rpc.HTTPError + if err == nil { + errStr = "" + } else if errors.As(err, &rpcErr) { + errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode()) + } else if errors.As(err, &httpErr) { + errStr = fmt.Sprintf("http_%d", httpErr.StatusCode) + } else if errors.Is(err, ethereum.NotFound) { + errStr = "" + } else { + errStr = "" + } + m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc() +} + // estimateBatchSize estimates the size of the batch func estimateBatchSize(block *types.Block) uint64 { size := uint64(70) // estimated overhead of batch metadata diff --git a/op-batcher/metrics/noop.go b/op-batcher/metrics/noop.go index 6293fcf7c876..dd917b6e628e 100644 --- a/op-batcher/metrics/noop.go +++ b/op-batcher/metrics/noop.go @@ -38,3 +38,8 @@ func (*noopMetrics) RecordBatchTxFailed() {} func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) { } + +func (m *noopMetrics) RecordRPCClientRequest(method string) func(err error) { + return func(err error) { + } +} diff --git a/op-proposer/metrics/metrics.go b/op-proposer/metrics/metrics.go index 0f549782424d..bcc5b79c357d 100644 --- a/op-proposer/metrics/metrics.go +++ b/op-proposer/metrics/metrics.go @@ -2,8 +2,12 @@ package metrics import ( "context" + "errors" + "fmt" + "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum-optimism/optimism/op-node/eth" @@ -16,6 +20,7 @@ import ( ) const Namespace = "op_proposer" +const RPCClientSubsystem = "rpc_client" type Metricer interface { RecordInfo(version string) @@ -28,6 +33,7 @@ type Metricer interface { txmetrics.TxMetricer RecordL2BlocksProposed(l2ref eth.L2BlockRef) + client.Metricer } type Metrics struct { @@ -40,6 +46,10 @@ type Metrics struct { info prometheus.GaugeVec up prometheus.Gauge + + RPCClientRequestsTotal *prometheus.CounterVec + RPCClientRequestDurationSeconds *prometheus.HistogramVec + RPCClientResponsesTotal *prometheus.CounterVec } var _ Metricer = (*Metrics)(nil) @@ -73,6 +83,32 @@ func NewMetrics(procName string) *Metrics { Name: "up", Help: "1 if the op-proposer has finished starting up", }), + RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: RPCClientSubsystem, + Name: "requests_total", + Help: "Total RPC requests initiated by the op-proposer's RPC client", + }, []string{ + "method", + }), + RPCClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: ns, + Subsystem: RPCClientSubsystem, + Name: "request_duration_seconds", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + Help: "Histogram of RPC client request durations", + }, []string{ + "method", + }), + RPCClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: RPCClientSubsystem, + Name: "responses_total", + Help: "Total RPC request responses received by the op-proposer's RPC client", + }, []string{ + "method", + "error", + }), } } @@ -109,3 +145,36 @@ func (m *Metrics) RecordL2BlocksProposed(l2ref eth.L2BlockRef) { func (m *Metrics) Document() []opmetrics.DocumentedMetric { return m.factory.Document() } + +func (m *Metrics) RecordRPCClientRequest(method string) func(err error) { + m.RPCClientRequestsTotal.WithLabelValues(method).Inc() + timer := prometheus.NewTimer(m.RPCClientRequestDurationSeconds.WithLabelValues(method)) + return func(err error) { + m.RecordRPCClientResponse(method, err) + timer.ObserveDuration() + } +} + +// RecordRPCClientResponse records an RPC response. It will +// convert the passed-in error into something metrics friendly. +// Nil errors get converted into , RPC errors are converted +// into rpc_, HTTP errors are converted into +// http_, and everything else is converted into +// . +func (m *Metrics) RecordRPCClientResponse(method string, err error) { + var errStr string + var rpcErr rpc.Error + var httpErr rpc.HTTPError + if err == nil { + errStr = "" + } else if errors.As(err, &rpcErr) { + errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode()) + } else if errors.As(err, &httpErr) { + errStr = fmt.Sprintf("http_%d", httpErr.StatusCode) + } else if errors.Is(err, ethereum.NotFound) { + errStr = "" + } else { + errStr = "" + } + m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc() +} diff --git a/op-proposer/metrics/noop.go b/op-proposer/metrics/noop.go index a353a5ff5335..23acbd2d410f 100644 --- a/op-proposer/metrics/noop.go +++ b/op-proposer/metrics/noop.go @@ -20,3 +20,8 @@ func (*noopMetrics) RecordL2BlocksProposed(l2ref eth.L2BlockRef) {} func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) { } + +func (m *noopMetrics) RecordRPCClientRequest(method string) func(err error) { + return func(err error) { + } +} diff --git a/op-proposer/proposer/l2_output_submitter.go b/op-proposer/proposer/l2_output_submitter.go index 89f6c06755b5..366b88e3f0ea 100644 --- a/op-proposer/proposer/l2_output_submitter.go +++ b/op-proposer/proposer/l2_output_submitter.go @@ -162,6 +162,7 @@ func NewL2OutputSubmitterConfigFromCLIConfig(cfg CLIConfig, l log.Logger, m metr if err != nil { return nil, err } + l1Client = opclient.NewInstrumentedClient(l1Client, m) rollupClient, err := opclient.DialRollupClientWithTimeout(ctx, cfg.RollupRpc, opclient.DefaultDialTimeout) if err != nil { diff --git a/op-service/client/ethclient.go b/op-service/client/ethclient.go index e5b4d4db0ee1..ce805848d469 100644 --- a/op-service/client/ethclient.go +++ b/op-service/client/ethclient.go @@ -2,12 +2,13 @@ package client import ( "context" + "math/big" + "time" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "math/big" - "time" "github.com/ethereum/go-ethereum/ethclient" ) @@ -15,7 +16,7 @@ import ( // DialEthClientWithTimeout attempts to dial the L1 provider using the provided // URL. If the dial doesn't complete within defaultDialTimeout seconds, this // method will return an error. -func DialEthClientWithTimeout(ctx context.Context, url string, timeout time.Duration) (*ethclient.Client, error) { +func DialEthClientWithTimeout(ctx context.Context, url string, timeout time.Duration) (EthClient, error) { ctxt, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -66,5 +67,6 @@ type EthClient interface { PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) + BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) Close() } diff --git a/op-service/client/fallback_client.go b/op-service/client/fallback_client.go index e1ab64afde5d..c65123df5495 100644 --- a/op-service/client/fallback_client.go +++ b/op-service/client/fallback_client.go @@ -152,6 +152,14 @@ func (l *FallbackClient) CallContract(ctx context.Context, call ethereum.CallMsg return contract, err } +func (l *FallbackClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + block, err := (*l.currentClient.Load()).BlockByNumber(ctx, number) + if err != nil { + l.handleErr(err, "BlockByNumber") + } + return block, err +} + func (l *FallbackClient) Close() { l.mx.Lock() defer l.mx.Unlock() diff --git a/op-service/client/metrics.go b/op-service/client/metrics.go new file mode 100644 index 000000000000..f76c1e372598 --- /dev/null +++ b/op-service/client/metrics.go @@ -0,0 +1,133 @@ +package client + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// InstrumentedClient is an Ethereum client that tracks +// Prometheus metrics for each call. +type InstrumentedClient struct { + c EthClient + m Metricer +} + +type Metricer interface { + RecordRPCClientRequest(method string) func(err error) +} + +// NewInstrumentedClient creates a new instrumented client. It takes +// a concrete EthClient to prevent people from passing in an already +// instrumented client. +func NewInstrumentedClient(c EthClient, m Metricer) EthClient { + return &InstrumentedClient{ + c: c, + m: m, + } +} + +func (ic *InstrumentedClient) Close() { + ic.c.Close() +} + +func (ic *InstrumentedClient) ChainID(ctx context.Context) (*big.Int, error) { + return instrument2[*big.Int](ic.m, "eth_chainId", func() (*big.Int, error) { + return ic.c.ChainID(ctx) + }) +} + +func (ic *InstrumentedClient) BlockNumber(ctx context.Context) (uint64, error) { + return instrument2[uint64](ic.m, "eth_blockNumber", func() (uint64, error) { + return ic.c.BlockNumber(ctx) + }) +} + +func (ic *InstrumentedClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return instrument2[*types.Header](ic.m, "eth_getHeaderByNumber", func() (*types.Header, error) { + return ic.c.HeaderByNumber(ctx, number) + }) +} + +func (ic *InstrumentedClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + return instrument2[*types.Receipt](ic.m, "eth_getTransactionReceipt", func() (*types.Receipt, error) { + return ic.c.TransactionReceipt(ctx, txHash) + }) +} + +func (ic *InstrumentedClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + return instrument2[*big.Int](ic.m, "eth_getBalance", func() (*big.Int, error) { + return ic.c.BalanceAt(ctx, account, blockNumber) + }) +} + +func (ic *InstrumentedClient) StorageAt(ctx context.Context, account common.Address, key common.Hash, blockNumber *big.Int) ([]byte, error) { + return instrument2[[]byte](ic.m, "eth_getStorageAt", func() ([]byte, error) { + return ic.c.StorageAt(ctx, account, key, blockNumber) + }) +} + +func (ic *InstrumentedClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { + return instrument2[[]byte](ic.m, "eth_getCode", func() ([]byte, error) { + return ic.c.CodeAt(ctx, account, blockNumber) + }) +} + +func (ic *InstrumentedClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + return instrument2[uint64](ic.m, "eth_getTransactionCount", func() (uint64, error) { + return ic.c.NonceAt(ctx, account, blockNumber) + }) +} + +func (ic *InstrumentedClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + return instrument2[uint64](ic.m, "eth_getTransactionCount", func() (uint64, error) { + return ic.c.PendingNonceAt(ctx, account) + }) +} + +func (ic *InstrumentedClient) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + return instrument2[[]byte](ic.m, "eth_call", func() ([]byte, error) { + return ic.c.CallContract(ctx, msg, blockNumber) + }) +} + +func (ic *InstrumentedClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { + return instrument2[uint64](ic.m, "eth_estimateGas", func() (uint64, error) { + return ic.c.EstimateGas(ctx, msg) + }) +} + +func (ic *InstrumentedClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { + return instrument1(ic.m, "eth_sendRawTransaction", func() error { + return ic.c.SendTransaction(ctx, tx) + }) +} + +func (ic *InstrumentedClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + return instrument2[*big.Int](ic.m, "eth_maxPriorityFeePerGas", func() (*big.Int, error) { + return ic.c.SuggestGasTipCap(ctx) + }) +} + +func (ic *InstrumentedClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + return instrument2[*types.Block](ic.m, "eth_getBlockByNumber", func() (*types.Block, error) { + return ic.c.BlockByNumber(ctx, number) + }) +} + +func instrument1(m Metricer, name string, cb func() error) error { + record := m.RecordRPCClientRequest(name) + err := cb() + record(err) + return err +} + +func instrument2[O any](m Metricer, name string, cb func() (O, error)) (O, error) { + record := m.RecordRPCClientRequest(name) + res, err := cb() + record(err) + return res, err +} diff --git a/op-service/txmgr/cli.go b/op-service/txmgr/cli.go index 34cfd981ddeb..67e1381e722f 100644 --- a/op-service/txmgr/cli.go +++ b/op-service/txmgr/cli.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" - txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics" "math/big" "time" + txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics" + opservice "github.com/ethereum-optimism/optimism/op-service" service_client "github.com/ethereum-optimism/optimism/op-service/client" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" @@ -186,6 +187,7 @@ func NewConfig(cfg CLIConfig, l log.Logger, m txmetrics.TxMetricer) (Config, err if err != nil { return Config{}, fmt.Errorf("could not dial eth client: %w", err) } + l1 = service_client.NewInstrumentedClient(l1, m) ctx, cancel = context.WithTimeout(context.Background(), cfg.NetworkTimeout) defer cancel() diff --git a/op-service/txmgr/metrics/noop.go b/op-service/txmgr/metrics/noop.go index 82a1642afd33..bb56db02b5fa 100644 --- a/op-service/txmgr/metrics/noop.go +++ b/op-service/txmgr/metrics/noop.go @@ -12,3 +12,7 @@ func (*NoopTxMetrics) TxConfirmed(*types.Receipt) {} func (*NoopTxMetrics) TxPublished(string) {} func (*NoopTxMetrics) RPCError() {} func (m *NoopTxMetrics) RecordL1UrlSwitchEvt(url string) {} +func (m *NoopTxMetrics) RecordRPCClientRequest(method string) func(err error) { + return func(err error) { + } +} diff --git a/op-service/txmgr/metrics/tx_metrics.go b/op-service/txmgr/metrics/tx_metrics.go index bd7713522233..305fbf8711e2 100644 --- a/op-service/txmgr/metrics/tx_metrics.go +++ b/op-service/txmgr/metrics/tx_metrics.go @@ -1,10 +1,15 @@ package metrics import ( + "errors" + "fmt" + "github.com/ethereum-optimism/optimism/op-service/client" "github.com/ethereum-optimism/optimism/op-service/metrics" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rpc" "github.com/prometheus/client_golang/prometheus" ) @@ -18,20 +23,24 @@ type TxMetricer interface { TxPublished(string) RPCError() client.FallbackClientMetricer + client.Metricer } type TxMetrics struct { - TxL1GasFee prometheus.Gauge - txFees prometheus.Counter - TxGasBump prometheus.Gauge - txFeeHistogram prometheus.Histogram - LatencyConfirmedTx prometheus.Gauge - currentNonce prometheus.Gauge - pendingTxs prometheus.Gauge - txPublishError *prometheus.CounterVec - publishEvent metrics.Event - confirmEvent metrics.EventVec - rpcError prometheus.Counter + TxL1GasFee prometheus.Gauge + txFees prometheus.Counter + TxGasBump prometheus.Gauge + txFeeHistogram prometheus.Histogram + LatencyConfirmedTx prometheus.Gauge + currentNonce prometheus.Gauge + pendingTxs prometheus.Gauge + txPublishError *prometheus.CounterVec + publishEvent metrics.Event + confirmEvent metrics.EventVec + rpcError prometheus.Counter + RPCClientRequestsTotal *prometheus.CounterVec + RPCClientRequestDurationSeconds *prometheus.HistogramVec + RPCClientResponsesTotal *prometheus.CounterVec *client.FallbackClientMetrics } @@ -108,6 +117,32 @@ func MakeTxMetrics(ns string, factory metrics.Factory) TxMetrics { Subsystem: "txmgr", }), FallbackClientMetrics: client.NewFallbackClientMetrics(ns, factory), + RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: "txmgr_rpc_client", + Name: "requests_total", + Help: "Total RPC requests initiated by the txmgr's RPC client", + }, []string{ + "method", + }), + RPCClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: ns, + Subsystem: "txmgr_rpc_client", + Name: "request_duration_seconds", + Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + Help: "Histogram of RPC client request durations", + }, []string{ + "method", + }), + RPCClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Subsystem: "txmgr_rpc_client", + Name: "responses_total", + Help: "Total RPC request responses received by the txmgr's RPC client", + }, []string{ + "method", + "error", + }), } } @@ -148,3 +183,36 @@ func (t *TxMetrics) TxPublished(errString string) { func (t *TxMetrics) RPCError() { t.rpcError.Inc() } + +func (t *TxMetrics) RecordRPCClientRequest(method string) func(err error) { + t.RPCClientRequestsTotal.WithLabelValues(method).Inc() + timer := prometheus.NewTimer(t.RPCClientRequestDurationSeconds.WithLabelValues(method)) + return func(err error) { + t.RecordRPCClientResponse(method, err) + timer.ObserveDuration() + } +} + +// RecordRPCClientResponse records an RPC response. It will +// convert the passed-in error into something metrics friendly. +// Nil errors get converted into , RPC errors are converted +// into rpc_, HTTP errors are converted into +// http_, and everything else is converted into +// . +func (t *TxMetrics) RecordRPCClientResponse(method string, err error) { + var errStr string + var rpcErr rpc.Error + var httpErr rpc.HTTPError + if err == nil { + errStr = "" + } else if errors.As(err, &rpcErr) { + errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode()) + } else if errors.As(err, &httpErr) { + errStr = fmt.Sprintf("http_%d", httpErr.StatusCode) + } else if errors.Is(err, ethereum.NotFound) { + errStr = "" + } else { + errStr = "" + } + t.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc() +} From 71b17cc4434ff9f0e0da02df886ed54bc05f93f6 Mon Sep 17 00:00:00 2001 From: bnoieh <135800952+bnoieh@users.noreply.github.com> Date: Thu, 11 Jan 2024 20:02:26 +0800 Subject: [PATCH 08/11] fix(op-node): remove 3s stepCtx for sequencer (#111) --- op-node/rollup/driver/state.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index b384b233ad68..883f4ebefb60 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -369,13 +369,7 @@ func (s *Driver) eventLoop() { case <-stepReqCh: s.metrics.SetDerivationIdle(false) s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts) - stepCtx := context.Background() - if s.driverConfig.SequencerEnabled && !s.driverConfig.SequencerStopped && s.driverConfig.SequencerPriority { - var cancelStep context.CancelFunc - stepCtx, cancelStep = context.WithTimeout(ctx, 3*time.Second) - defer cancelStep() - } - err := s.derivation.Step(stepCtx) + err := s.derivation.Step(context.Background()) stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress. if err == io.EOF { s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin(), "err", err) From 8f623e647262a4f0efe20fc7a548f2c7e69f8115 Mon Sep 17 00:00:00 2001 From: Owen <103096885+owen-reorg@users.noreply.github.com> Date: Wed, 17 Jan 2024 16:57:45 +0800 Subject: [PATCH 09/11] doc: update v0.2.4 changelog (#113) --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 987b25b342c7..e0edc9fa3a25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ This is a minor release and upgrading is optional. - #101: optimize(op-node): continue optimizing sequencer step schedule - #104: feat(op-node): pre-fetch receipts concurrently round 2 - #106: optimize: extended expire time for sequencer block broadcasting +- #108: optimize(op-node): increase catching up speed when sequencer lagging +- #109: feat(op-batcher/op-proposer): add InstrumentedClient +- #111: fix(op-node): remove 3s stepCtx for sequencer ### Docker Images From a4e72132199db847699862e3927996242c5a004c Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Fri, 26 Jan 2024 17:18:31 +0800 Subject: [PATCH 10/11] fix(op-node): pre-fetching handle L1 reOrg (#115) Co-authored-by: Welkin --- op-node/sources/caching/pre_fetch_cache.go | 3 +- op-node/sources/l1_client.go | 44 +++++- op-node/sources/l1_client_test.go | 167 +++++++++++++++++++++ 3 files changed, 208 insertions(+), 6 deletions(-) create mode 100644 op-node/sources/l1_client_test.go diff --git a/op-node/sources/caching/pre_fetch_cache.go b/op-node/sources/caching/pre_fetch_cache.go index 19842b1a66c7..fa24a612f941 100644 --- a/op-node/sources/caching/pre_fetch_cache.go +++ b/op-node/sources/caching/pre_fetch_cache.go @@ -43,7 +43,8 @@ func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFu defer v.lock.Unlock() v.lock.Lock() if _, ok := v.inner[key]; ok { - return false, false + v.inner[key] = value + return true, false } if v.queue.Size() >= v.maxSize { return false, true diff --git a/op-node/sources/l1_client.go b/op-node/sources/l1_client.go index 5c8bc95cd760..108704bcc142 100644 --- a/op-node/sources/l1_client.go +++ b/op-node/sources/l1_client.go @@ -48,6 +48,8 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide } } +const sequencerConfDepth = 15 + // L1Client provides typed bindings to retrieve L1 data from an RPC source, // with optimized batch requests, cached results, and flag to not trust the RPC // (i.e. to verify all returned contents against corresponding block hashes). @@ -139,6 +141,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 s.log.Info("pre-fetching receipts start", "startBlock", l1Start) go func() { var currentL1Block uint64 + var parentHash *common.Hash for { select { case <-s.done: @@ -147,6 +150,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 case currentL1Block = <-s.preFetchReceiptsStartBlockChan: s.log.Debug("pre-fetching receipts currentL1Block changed", "block", currentL1Block) s.receiptsCache.RemoveAll() + parentHash = nil default: blockRef, err := s.L1BlockRefByLabel(ctx, eth.Unsafe) if err != nil { @@ -169,6 +173,9 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 taskCount = int(blockRef.Number-currentL1Block) + 1 } + blockInfoChan := make(chan eth.L1BlockRef, taskCount) + oldestFetchBlockNumber := currentL1Block + var wg sync.WaitGroup for i := 0; i < taskCount; i++ { wg.Add(1) @@ -179,15 +186,17 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 case <-s.done: return default: - if _, ok := s.receiptsCache.Get(blockNumber); ok { - return - } + pair, ok := s.receiptsCache.Get(blockNumber) blockInfo, err := s.L1BlockRefByNumber(ctx, blockNumber) if err != nil { s.log.Debug("failed to fetch block ref", "err", err, "blockNumber", blockNumber) time.Sleep(1 * time.Second) continue } + if ok && pair.blockHash == blockInfo.Hash { + return + } + isSuccess, err := s.PreFetchReceipts(ctx, blockInfo.Hash) if err != nil { s.log.Warn("failed to pre-fetch receipts", "err", err) @@ -198,14 +207,39 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 time.Sleep(1 * time.Second) continue } - s.log.Debug("pre-fetching receipts done", "block", blockInfo.Number) - break + s.log.Debug("pre-fetching receipts done", "block", blockInfo.Number, "hash", blockInfo.Hash) + blockInfoChan <- blockInfo + return } } }(ctx, currentL1Block) currentL1Block = currentL1Block + 1 } wg.Wait() + close(blockInfoChan) + + //try to find out l1 reOrg and return to an earlier block height for re-prefetching + var latestBlockHash common.Hash + latestBlockNumber := uint64(0) + var oldestBlockParentHash common.Hash + for l1BlockInfo := range blockInfoChan { + if l1BlockInfo.Number > latestBlockNumber { + latestBlockHash = l1BlockInfo.Hash + latestBlockNumber = l1BlockInfo.Number + } + if l1BlockInfo.Number == oldestFetchBlockNumber { + oldestBlockParentHash = l1BlockInfo.ParentHash + } + } + + s.log.Debug("pre-fetching receipts hash", "latestBlockHash", latestBlockHash, "latestBlockNumber", latestBlockNumber, "oldestBlockNumber", oldestFetchBlockNumber, "oldestBlockParentHash", oldestBlockParentHash) + if parentHash != nil && oldestBlockParentHash != (common.Hash{}) && oldestBlockParentHash != *parentHash && currentL1Block >= sequencerConfDepth+uint64(taskCount) { + currentL1Block = currentL1Block - sequencerConfDepth - uint64(taskCount) + s.log.Warn("pre-fetching receipts found l1 reOrg, return to an earlier block height for re-prefetching", "recordParentHash", *parentHash, "unsafeParentHash", oldestBlockParentHash, "number", oldestFetchBlockNumber, "backToNumber", currentL1Block) + parentHash = nil + continue + } + parentHash = &latestBlockHash } } }() diff --git a/op-node/sources/l1_client_test.go b/op-node/sources/l1_client_test.go new file mode 100644 index 000000000000..b5e7fd3c7bc0 --- /dev/null +++ b/op-node/sources/l1_client_test.go @@ -0,0 +1,167 @@ +package sources + +import ( + "context" + "testing" + "time" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" + "github.com/ethereum-optimism/optimism/op-node/testlog" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestGoOrUpdatePreFetchReceipts(t *testing.T) { + t.Run("handleReOrg", func(t *testing.T) { + m := new(mockRPC) + ctx := context.Background() + clientLog := testlog.Logger(t, log.LvlDebug) + latestHead := &rpcHeader{ + ParentHash: randHash(), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: 100, + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{"latest", false}).Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = latestHead + }).Return([]error{nil}) + for i := 81; i <= 90; i++ { + currentHead := &rpcHeader{ + ParentHash: randHash(), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: hexutil.Uint64(i), + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + currentBlock := &rpcBlock{ + rpcHeader: *currentHead, + Transactions: []*types.Transaction{}, + } + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = currentHead + }).Return([]error{nil}) + m.On("CallContext", ctx, new(*rpcBlock), + "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcBlock) = currentBlock + }).Return([]error{nil}) + } + for i := 91; i <= 100; i++ { + currentHead := &rpcHeader{ + ParentHash: randHash(), + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: hexutil.Uint64(i), + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = currentHead + }).Return([]error{nil}) + currentBlock := &rpcBlock{ + rpcHeader: *currentHead, + Transactions: []*types.Transaction{}, + } + m.On("CallContext", ctx, new(*rpcBlock), + "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcBlock) = currentBlock + }).Return([]error{nil}) + } + var lastParentHeader common.Hash + var real100Hash common.Hash + for i := 76; i <= 100; i++ { + currentHead := &rpcHeader{ + ParentHash: lastParentHeader, + UncleHash: common.Hash{}, + Coinbase: common.Address{}, + Root: types.EmptyRootHash, + TxHash: types.EmptyTxsHash, + ReceiptHash: types.EmptyReceiptsHash, + Bloom: eth.Bytes256{}, + Difficulty: hexutil.Big{}, + Number: hexutil.Uint64(i), + GasLimit: 0, + GasUsed: 0, + Time: 0, + Extra: nil, + MixDigest: common.Hash{}, + Nonce: types.BlockNonce{}, + BaseFee: nil, + WithdrawalsRoot: nil, + Hash: randHash(), + } + if i == 100 { + real100Hash = currentHead.Hash + } + lastParentHeader = currentHead.Hash + m.On("CallContext", ctx, new(*rpcHeader), + "eth_getBlockByNumber", []any{numberID(i).Arg(), false}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcHeader) = currentHead + }).Return([]error{nil}) + currentBlock := &rpcBlock{ + rpcHeader: *currentHead, + Transactions: []*types.Transaction{}, + } + m.On("CallContext", ctx, new(*rpcBlock), + "eth_getBlockByHash", []any{currentHead.Hash, true}).Once().Run(func(args mock.Arguments) { + *args[1].(**rpcBlock) = currentBlock + }).Return([]error{nil}) + } + s, err := NewL1Client(m, clientLog, nil, L1ClientDefaultConfig(&rollup.Config{SeqWindowSize: 1000}, true, RPCKindBasic)) + require.NoError(t, err) + err2 := s.GoOrUpdatePreFetchReceipts(ctx, 81) + require.NoError(t, err2) + time.Sleep(1 * time.Second) + pair, ok := s.receiptsCache.Get(100) + require.True(t, ok, "100 cache miss") + require.Equal(t, real100Hash, pair.blockHash, "block 100 hash is different,want:%s,but:%s", real100Hash, pair.blockHash) + _, ok2 := s.receiptsCache.Get(76) + require.True(t, ok2, "76 cache miss") + }) +} From 5c5bf6313cf14137b22c3b4426f8ec74d54519c1 Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Tue, 30 Jan 2024 16:06:10 +0800 Subject: [PATCH 11/11] fix(op-node): pre-fetching handle L1 reOrg round 2 (#117) Co-authored-by: Welkin --- op-node/sources/l1_client.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/op-node/sources/l1_client.go b/op-node/sources/l1_client.go index 108704bcc142..4c5674725a89 100644 --- a/op-node/sources/l1_client.go +++ b/op-node/sources/l1_client.go @@ -141,7 +141,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 s.log.Info("pre-fetching receipts start", "startBlock", l1Start) go func() { var currentL1Block uint64 - var parentHash *common.Hash + var parentHash common.Hash for { select { case <-s.done: @@ -150,7 +150,7 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 case currentL1Block = <-s.preFetchReceiptsStartBlockChan: s.log.Debug("pre-fetching receipts currentL1Block changed", "block", currentL1Block) s.receiptsCache.RemoveAll() - parentHash = nil + parentHash = common.Hash{} default: blockRef, err := s.L1BlockRefByLabel(ctx, eth.Unsafe) if err != nil { @@ -194,13 +194,15 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 continue } if ok && pair.blockHash == blockInfo.Hash { + blockInfoChan <- blockInfo return } isSuccess, err := s.PreFetchReceipts(ctx, blockInfo.Hash) if err != nil { s.log.Warn("failed to pre-fetch receipts", "err", err) - return + time.Sleep(1 * time.Second) + continue } if !isSuccess { s.log.Debug("pre fetch receipts fail without error,need retry", "blockHash", blockInfo.Hash, "blockNumber", blockNumber) @@ -233,13 +235,13 @@ func (s *L1Client) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Start uint6 } s.log.Debug("pre-fetching receipts hash", "latestBlockHash", latestBlockHash, "latestBlockNumber", latestBlockNumber, "oldestBlockNumber", oldestFetchBlockNumber, "oldestBlockParentHash", oldestBlockParentHash) - if parentHash != nil && oldestBlockParentHash != (common.Hash{}) && oldestBlockParentHash != *parentHash && currentL1Block >= sequencerConfDepth+uint64(taskCount) { + if parentHash != (common.Hash{}) && oldestBlockParentHash != (common.Hash{}) && oldestBlockParentHash != parentHash && currentL1Block >= sequencerConfDepth+uint64(taskCount) { currentL1Block = currentL1Block - sequencerConfDepth - uint64(taskCount) - s.log.Warn("pre-fetching receipts found l1 reOrg, return to an earlier block height for re-prefetching", "recordParentHash", *parentHash, "unsafeParentHash", oldestBlockParentHash, "number", oldestFetchBlockNumber, "backToNumber", currentL1Block) - parentHash = nil + s.log.Warn("pre-fetching receipts found l1 reOrg, return to an earlier block height for re-prefetching", "recordParentHash", parentHash, "unsafeParentHash", oldestBlockParentHash, "number", oldestFetchBlockNumber, "backToNumber", currentL1Block) + parentHash = common.Hash{} continue } - parentHash = &latestBlockHash + parentHash = latestBlockHash } } }()