Skip to content

Commit

Permalink
fix: op-node in merge upstream v1.7.2
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoieh committed Apr 23, 2024
1 parent 633e57c commit e720f6e
Show file tree
Hide file tree
Showing 17 changed files with 61 additions and 46 deletions.
4 changes: 0 additions & 4 deletions op-node/chaincfg/chains_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,3 @@ var sepoliaDev0Cfg = rollup.Config{
EcotoneTime: u64Ptr(1706634000),
ProtocolVersionsAddress: common.HexToAddress("0x252CbE9517F731C618961D890D534183822dcC8d"),
}

func u64Ptr(v uint64) *uint64 {
return &v
}
6 changes: 0 additions & 6 deletions op-node/p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,6 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti
return pubsub.ValidationReject
}

// [REJECT] if a V2 Block has non-empty withdrawals
if blockVersion == eth.BlockV2 && len(*payload.Withdrawals) != 0 {
log.Warn("payload is on v2 topic, but has non-empty withdrawals", "bad_hash", payload.BlockHash.String(), "withdrawal_count", len(*payload.Withdrawals))
return pubsub.ValidationReject
}

seen, ok := blockHeightLRU.Get(uint64(payload.BlockNumber))
if !ok {
seen = new(seenBlocks)
Expand Down
4 changes: 2 additions & 2 deletions op-node/rollup/derive/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type L1ReceiptsFetcher interface {

type SystemConfigL2Fetcher interface {
SystemConfigByL2Hash(ctx context.Context, hash common.Hash) (eth.SystemConfig, error)
CachePayloadByHash(payload *eth.ExecutionPayload) bool
CachePayloadByHash(payload *eth.ExecutionPayloadEnvelope) bool
}

// FetchingAttributesBuilder fetches inputs for the building of L2 payload attributes on the fly.
Expand Down Expand Up @@ -173,7 +173,7 @@ func (ba *FetchingAttributesBuilder) PreparePayloadAttributes(ctx context.Contex
}, nil
}

func (ba *FetchingAttributesBuilder) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
func (ba *FetchingAttributesBuilder) CachePayloadByHash(payload *eth.ExecutionPayloadEnvelope) bool {
return ba.l2.CachePayloadByHash(payload)
}

Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/derive/attributes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

type AttributesBuilder interface {
PreparePayloadAttributes(ctx context.Context, l2Parent eth.L2BlockRef, epoch eth.BlockID) (attrs *eth.PayloadAttributes, err error)
CachePayloadByHash(payload *eth.ExecutionPayload) bool
CachePayloadByHash(payload *eth.ExecutionPayloadEnvelope) bool
}

type AttributesQueue struct {
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/derive/engine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (e *EngineController) ConfirmPayload(ctx context.Context, agossip async.Asy
}
// Update the safe head if the payload is built with the last attributes in the batch.
updateSafe := e.buildingSafe && e.safeAttrs != nil && e.safeAttrs.isLastInSpan
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor)
envelope, errTyp, err := confirmPayload(ctx, e.log, e.engine, fc, e.buildingInfo, updateSafe, agossip, sequencerConductor, e.metrics)
if err != nil {
return nil, errTyp, fmt.Errorf("failed to complete building on top of L2 chain %s, id: %s, error (%d): %w", e.buildingOnto, e.buildingInfo.ID, errTyp, err)
}
Expand Down
1 change: 0 additions & 1 deletion op-node/rollup/derive/engine_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,6 @@ func TestVerifyNewL1Origin(t *testing.T) {

// L1 chain reorgs so new origin is at same slot as refF but on a different fork
prev.origin = test.newOrigin
eq.UnsafeL2Head()
if test.verifyPass {
l1F.ExpectClearReceiptsCacheBefore(refB.Number)
}
Expand Down
4 changes: 4 additions & 0 deletions op-node/rollup/derive/engine_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func confirmPayload(
updateSafe bool,
agossip async.AsyncGossiper,
sequencerConductor conductor.SequencerConductor,
metrics Metrics,
) (out *eth.ExecutionPayloadEnvelope, errTyp BlockInsertionErrType, err error) {
var envelope *eth.ExecutionPayloadEnvelope
// if the payload is available from the async gossiper, it means it was not yet imported, so we reuse it
Expand All @@ -141,7 +142,9 @@ func confirmPayload(
"parent", envelope.ExecutionPayload.ParentHash,
"txs", len(envelope.ExecutionPayload.Transactions))
} else {
start := time.Now()
envelope, err = eng.GetPayload(ctx, payloadInfo)
metrics.RecordSequencerStepTime("getPayload", time.Since(start))
}
if err != nil {
// even if it is an input-error (unknown payload ID), it is temporary, since we will re-attempt the full payload building, not just the retrieval of the payload.
Expand All @@ -158,6 +161,7 @@ func confirmPayload(
// agossip.Clear() will be called later if an non-temporary error is found, or if the payload is successfully inserted
agossip.Gossip(envelope)

start := time.Now()
status, err := eng.NewPayload(ctx, payload, envelope.ParentBeaconBlockRoot)
if err != nil {
return nil, BlockInsertTemporaryErr, fmt.Errorf("failed to insert execution payload: %w", err)
Expand Down
5 changes: 1 addition & 4 deletions op-node/rollup/driver/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
// When block produce is interrupted by high L1 latency, sequencer will build a full block periodically to avoid chain stuck
const buildFullBlockInterval = 20

// When block produce is lagging exceed lagTimeWindow, sequencer will set attrs.NoTxPool to true to quickly catch up
const lagTimeWindow = 2 * time.Minute

type Downloader interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
Expand Down Expand Up @@ -254,10 +251,10 @@ func (d *Sequencer) RunNextSequencerAction(ctx context.Context, agossip async.As
return nil, nil
} else {
payload := envelope.ExecutionPayload
d.attrBuilder.CachePayloadByHash(payload)
if len(payload.Transactions) == 1 {
d.accEmptyBlocks += 1
}
d.attrBuilder.CachePayloadByHash(envelope)
d.log.Info("sequencer successfully built a new block", "block", payload.ID(), "time", uint64(payload.Timestamp), "txs", len(payload.Transactions))
return envelope, nil
}
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,8 @@ func (c *Config) LogDescription(log log.Logger, l2Chains map[string]string) {
"ecotone_time", fmtForkTimeOrUnset(c.EcotoneTime),
"fjord_time", fmtForkTimeOrUnset(c.FjordTime),
"interop_time", fmtForkTimeOrUnset(c.InteropTime),
"fermat", c.Fermat,
"snow_time", fmtForkTimeOrUnset(c.SnowTime),
)
}

Expand Down
8 changes: 0 additions & 8 deletions op-node/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,6 @@ Conflicting configuration is deprecated, and will stop the op-node from starting
if err != nil {
return nil, err
}
if ctx.IsSet(flags.CanyonOverrideFlag.Name) {
canyon := ctx.Uint64(flags.CanyonOverrideFlag.Name)
config.CanyonTime = &canyon
}
return &config, nil
}

Expand All @@ -250,10 +246,6 @@ Conflicting configuration is deprecated, and will stop the op-node from starting
return &rollupConfig, nil
}

if ctx.IsSet(flags.CanyonOverrideFlag.Name) {
canyon := ctx.Uint64(flags.CanyonOverrideFlag.Name)
presetRollupConfig.CanyonTime = &canyon
}
log.Warn("using preset rollup config of", rollupConfig.L2ChainID, "overwrite rollup file")
return &presetRollupConfig, nil
}
Expand Down
2 changes: 1 addition & 1 deletion op-program/client/l2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,6 @@ func (o *OracleEngine) SystemConfigByL2Hash(ctx context.Context, hash common.Has
return derive.PayloadToSystemConfig(o.rollupCfg, payload.ExecutionPayload)
}

func (o *OracleEngine) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
func (o *OracleEngine) CachePayloadByHash(payload *eth.ExecutionPayloadEnvelope) bool {
return true
}
4 changes: 4 additions & 0 deletions op-service/sources/caching/pre_fetch_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (v *PreFetchCache[V]) AddIfNotFull(key uint64, value V) (success bool, isFu
return true, false
}

func (v *PreFetchCache[V]) IsFull() bool {
return v.queue.Size() >= v.maxSize
}

func (v *PreFetchCache[V]) Get(key uint64, recordMetrics bool) (V, bool) {
defer v.lock.Unlock()
v.lock.Lock()
Expand Down
15 changes: 12 additions & 3 deletions op-service/sources/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ type EthClient struct {
// cache payloads by hash
// common.Hash -> *eth.ExecutionPayload
payloadsCache *caching.LRUCache[common.Hash, *eth.ExecutionPayloadEnvelope]

// isReadOrderly Indicates whether the client often reads data in order of block height.
// If so, the process of reading the cache will be different to ensure a high cache hit rate.
isReadOrderly bool
}

// NewEthClient returns an [EthClient], wrapping an RPC with bindings to fetch ethereum data with added error logging,
Expand All @@ -146,6 +150,7 @@ func NewEthClient(client client.RPC, log log.Logger, metrics caching.Metrics, co
transactionsCache: caching.NewLRUCache[common.Hash, types.Transactions](metrics, "txs", config.TransactionsCacheSize),
headersCache: caching.NewLRUCache[common.Hash, eth.BlockInfo](metrics, "headers", config.HeadersCacheSize),
payloadsCache: caching.NewLRUCache[common.Hash, *eth.ExecutionPayloadEnvelope](metrics, "payloads", config.PayloadsCacheSize),
isReadOrderly: isReadOrderly,
}, nil
}

Expand Down Expand Up @@ -312,6 +317,10 @@ func (s *EthClient) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.E
return s.payloadCall(ctx, "eth_getBlockByHash", hashID(hash))
}

func (s *EthClient) CachePayloadByHash(payload *eth.ExecutionPayloadEnvelope) bool {
return s.payloadsCache.Add(payload.ExecutionPayload.BlockHash, payload)
}

func (s *EthClient) PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayloadEnvelope, error) {
return s.payloadCall(ctx, "eth_getBlockByNumber", numberID(number))
}
Expand Down Expand Up @@ -342,15 +351,15 @@ func (s *EthClient) PreFetchReceipts(ctx context.Context, blockHash common.Hash)
func (s *EthClient) fetchReceiptsInner(ctx context.Context, blockHash common.Hash, isForPreFetch bool) (eth.BlockInfo, types.Receipts, error, bool) {
info, txs, err := s.infoAndTxsByHash(ctx, blockHash, !isForPreFetch)
if err != nil {
return nil, nil, fmt.Errorf("querying block: %w", err)
return nil, nil, fmt.Errorf("querying block: %w", err), false
}

txHashes, _ := eth.TransactionsToHashes(txs), eth.ToBlockID(info)
receipts, err := s.recProvider.FetchReceipts(ctx, info, txHashes)
receipts, err, isFull := s.recProvider.FetchReceipts(ctx, info, txHashes, isForPreFetch)
if err != nil {
return nil, nil, err, isFull
}
return info, receipts, nil
return info, receipts, nil, isFull
}

// GetProof returns an account proof result, with any optional requested storage proofs.
Expand Down
12 changes: 12 additions & 0 deletions op-service/sources/receipts.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,19 @@ import (
"github.com/ethereum/go-ethereum/trie"
)

type ReceiptsHashPair struct {
blockHash common.Hash
receipts types.Receipts
}

type ReceiptsProvider interface {
// FetchReceipts returns a block info and all of the receipts associated with transactions in the block.
// It verifies the receipt hash in the block header against the receipt hash of the fetched receipts
// to ensure that the execution engine did not fail to return any receipts.
FetchReceipts(ctx context.Context, blockInfo eth.BlockInfo, txHashes []common.Hash, isForPreFetch bool) (types.Receipts, error, bool)
}

type InnerReceiptsProvider interface {
// FetchReceipts returns a block info and all of the receipts associated with transactions in the block.
// It verifies the receipt hash in the block header against the receipt hash of the fetched receipts
// to ensure that the execution engine did not fail to return any receipts.
Expand Down
32 changes: 19 additions & 13 deletions op-service/sources/receipts_caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ import (
// A CachingReceiptsProvider caches successful receipt fetches from the inner
// ReceiptsProvider. It also avoids duplicate in-flight requests per block hash.
type CachingReceiptsProvider struct {
inner ReceiptsProvider
cache *caching.LRUCache[common.Hash, types.Receipts]

inner InnerReceiptsProvider
cache *caching.PreFetchCache[*ReceiptsHashPair]
// lock fetching process for each block hash to avoid duplicate requests
fetching map[common.Hash]*sync.Mutex
fetchingMu sync.Mutex // only protects map
}

func NewCachingReceiptsProvider(inner ReceiptsProvider, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
func NewCachingReceiptsProvider(inner InnerReceiptsProvider, m caching.Metrics, cacheSize int) *CachingReceiptsProvider {
return &CachingReceiptsProvider{
inner: inner,
cache: caching.NewLRUCache[common.Hash, types.Receipts](m, "receipts", cacheSize),
cache: caching.NewPreFetchCache[*ReceiptsHashPair](m, "receipts", cacheSize),
fetching: make(map[common.Hash]*sync.Mutex),
}
}
Expand Down Expand Up @@ -53,30 +52,37 @@ func (p *CachingReceiptsProvider) deleteFetchingLock(blockHash common.Hash) {

// FetchReceipts fetches receipts for the given block and transaction hashes
// it expects that the inner FetchReceipts implementation handles validation
func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, blockInfo eth.BlockInfo, txHashes []common.Hash) (types.Receipts, error) {
func (p *CachingReceiptsProvider) FetchReceipts(ctx context.Context, blockInfo eth.BlockInfo, txHashes []common.Hash, isForPreFetch bool) (types.Receipts, error, bool) {
block := eth.ToBlockID(blockInfo)
if r, ok := p.cache.Get(block.Hash); ok {
return r, nil
var isFull bool

if v, ok := p.cache.Get(block.Number, !isForPreFetch); ok && v.blockHash == block.Hash {
return v.receipts, nil, isFull
}

mu := p.getOrCreateFetchingLock(block.Hash)
mu.Lock()
defer mu.Unlock()
// Other routine might have fetched in the meantime
if r, ok := p.cache.Get(block.Hash); ok {
if v, ok := p.cache.Get(block.Number, !isForPreFetch); ok && v.blockHash == block.Hash {
// we might have created a new lock above while the old
// fetching job completed.
p.deleteFetchingLock(block.Hash)
return r, nil
return v.receipts, nil, isFull
}

isFull = p.cache.IsFull()
if isForPreFetch && isFull {
return nil, nil, true
}

r, err := p.inner.FetchReceipts(ctx, blockInfo, txHashes)
if err != nil {
return nil, err
return nil, err, isFull
}

p.cache.Add(block.Hash, r)
p.cache.AddIfNotFull(block.Number, &ReceiptsHashPair{blockHash: block.Hash, receipts: r})
// result now in cache, can delete fetching lock
p.deleteFetchingLock(block.Hash)
return r, nil
return r, nil, isFull
}
2 changes: 1 addition & 1 deletion op-service/testutils/mock_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ func (m *MockEngine) ExpectNewPayload(payload *eth.ExecutionPayload, parentBeaco
m.Mock.On("NewPayload", payload, parentBeaconBlockRoot).Once().Return(result, err)
}

func (m *MockEngine) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
func (m *MockEngine) CachePayloadByHash(payload *eth.ExecutionPayloadEnvelope) bool {
return true
}
2 changes: 1 addition & 1 deletion op-service/testutils/mock_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ func (m *MockL2Client) ExpectOutputV0AtBlock(blockHash common.Hash, output *eth.
m.Mock.On("OutputV0AtBlock", blockHash).Once().Return(output, err)
}

func (m *MockL2Client) CachePayloadByHash(payload *eth.ExecutionPayload) bool {
func (m *MockL2Client) CachePayloadByHash(payload *eth.ExecutionPayloadEnvelope) bool {
return true
}

0 comments on commit e720f6e

Please sign in to comment.