Skip to content

Commit

Permalink
Merge branch opbnb/develop into merge-upstream-v1.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bnoieh committed Feb 8, 2024
2 parents 56be93a + 5c5bf63 commit 7c6c489
Show file tree
Hide file tree
Showing 32 changed files with 806 additions and 57 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ This is a minor release and upgrading is optional.
- #87: optimize(op-node): make block produce stable when L1 latency unstable
- #89: feat(op-node): add opBNB bootnodes
- #94: fix(op-node/op-batcher): fallbackClient should ignore ethereum.NotFound error
- #100: feature(op-node): pre-fetch receipts concurrently
- #101: optimize(op-node): continue optimizing sequencer step schedule
- #104: feat(op-node): pre-fetch receipts concurrently round 2
- #106: optimize: extended expire time for sequencer block broadcasting
- #108: optimize(op-node): increase catching up speed when sequencer lagging
- #109: feat(op-batcher/op-proposer): add InstrumentedClient
- #111: fix(op-node): remove 3s stepCtx for sequencer

### Docker Images

Expand Down
9 changes: 4 additions & 5 deletions op-batcher/batcher/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"

"github.com/ethereum-optimism/optimism/op-batcher/metrics"
Expand Down Expand Up @@ -39,8 +38,8 @@ type BatcherConfig struct {
type BatcherService struct {
Log log.Logger
Metrics metrics.Metricer
L1Client client.ETHClient
L2Client *ethclient.Client
L1Client client.Client
L2Client client.Client
RollupNode *sources.RollupClient
TxManager txmgr.TxManager

Expand Down Expand Up @@ -122,13 +121,13 @@ func (bs *BatcherService) initRPCClients(ctx context.Context, cfg *CLIConfig) er
if err != nil {
return fmt.Errorf("failed to dial L1 RPC: %w", err)
}
bs.L1Client = l1Client
bs.L1Client = client.NewInstrumentedClientWithClient(l1Client, bs.Metrics)

l2Client, err := dial.DialEthClientWithTimeout(ctx, dial.DefaultDialTimeout, bs.Log, cfg.L2EthRpc)
if err != nil {
return fmt.Errorf("failed to dial L2 engine RPC: %w", err)
}
bs.L2Client = l2Client
bs.L2Client = client.NewInstrumentedClientWithClient(l2Client, bs.Metrics)

rollupClient, err := dial.DialRollupClientWithTimeout(ctx, dial.DefaultDialTimeout, bs.Log, cfg.RollupRpc)
if err != nil {
Expand Down
70 changes: 70 additions & 0 deletions op-batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"errors"
"fmt"
"io"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -9,14 +11,17 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"

"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/client"
"github.com/ethereum-optimism/optimism/op-service/eth"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)

const Namespace = "op_batcher"
const RPCClientSubsystem = "rpc_client"

type Metricer interface {
RecordInfo(version string)
Expand Down Expand Up @@ -47,6 +52,7 @@ type Metricer interface {
RecordBatchTxFailed()

Document() []opmetrics.DocumentedMetric
client.Metricer
}

type Metrics struct {
Expand Down Expand Up @@ -79,6 +85,10 @@ type Metrics struct {
channelOutputBytesTotal prometheus.Counter

batcherTxEvs opmetrics.EventVec

RPCClientRequestsTotal *prometheus.CounterVec
RPCClientRequestDurationSeconds *prometheus.HistogramVec
RPCClientResponsesTotal *prometheus.CounterVec
}

var _ Metricer = (*Metrics)(nil)
Expand Down Expand Up @@ -183,6 +193,33 @@ func NewMetrics(procName string) *Metrics {
}),

batcherTxEvs: opmetrics.NewEventVec(factory, ns, "", "batcher_tx", "BatcherTx", []string{"stage"}),

RPCClientRequestsTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "requests_total",
Help: "Total RPC requests initiated by the op-batcher's RPC client",
}, []string{
"method",
}),
RPCClientRequestDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "request_duration_seconds",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
Help: "Histogram of RPC client request durations",
}, []string{
"method",
}),
RPCClientResponsesTotal: factory.NewCounterVec(prometheus.CounterOpts{
Namespace: ns,
Subsystem: RPCClientSubsystem,
Name: "responses_total",
Help: "Total RPC request responses received by the op-batcher's RPC client",
}, []string{
"method",
"error",
}),
}
}

Expand Down Expand Up @@ -304,6 +341,39 @@ func (m *Metrics) RecordBatchTxFailed() {
m.batcherTxEvs.Record(TxStageFailed)
}

func (m *Metrics) RecordRPCClientRequest(method string) func(err error) {
m.RPCClientRequestsTotal.WithLabelValues(method).Inc()
timer := prometheus.NewTimer(m.RPCClientRequestDurationSeconds.WithLabelValues(method))
return func(err error) {
m.RecordRPCClientResponse(method, err)
timer.ObserveDuration()
}
}

// RecordRPCClientResponse records an RPC response. It will
// convert the passed-in error into something metrics friendly.
// Nil errors get converted into <nil>, RPC errors are converted
// into rpc_<error code>, HTTP errors are converted into
// http_<status code>, and everything else is converted into
// <unknown>.
func (m *Metrics) RecordRPCClientResponse(method string, err error) {
var errStr string
var rpcErr rpc.Error
var httpErr rpc.HTTPError
if err == nil {
errStr = "<nil>"
} else if errors.As(err, &rpcErr) {
errStr = fmt.Sprintf("rpc_%d", rpcErr.ErrorCode())
} else if errors.As(err, &httpErr) {
errStr = fmt.Sprintf("http_%d", httpErr.StatusCode)
} else if errors.Is(err, ethereum.NotFound) {
errStr = "<not found>"
} else {
errStr = "<unknown>"
}
m.RPCClientResponsesTotal.WithLabelValues(method, errStr).Inc()
}

// estimateBatchSize estimates the size of the batch
func estimateBatchSize(block *types.Block) uint64 {
size := uint64(70) // estimated overhead of batch metadata
Expand Down
5 changes: 5 additions & 0 deletions op-batcher/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ func (*noopMetrics) StartBalanceMetrics(log.Logger, ethereum.ChainStateReader, c
}
func (m *noopMetrics) RecordL1UrlSwitchEvt(url string) {
}

func (m *noopMetrics) RecordRPCClientRequest(method string) func(err error) {
return func(err error) {
}
}
10 changes: 5 additions & 5 deletions op-node/p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ const (
globalValidateThrottle = 512
gossipHeartbeat = 500 * time.Millisecond
// seenMessagesTTL limits the duration that message IDs are remembered for gossip deduplication purposes
// 130 * gossipHeartbeat
seenMessagesTTL = 130 * gossipHeartbeat
// 2500 * gossipHeartbeat
seenMessagesTTL = 2500 * gossipHeartbeat
DefaultMeshD = 8 // topic stable mesh target count
DefaultMeshDlo = 6 // topic stable mesh low watermark
DefaultMeshDhi = 12 // topic stable mesh high watermark
Expand Down Expand Up @@ -247,7 +247,7 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti

// Seen block hashes per block height
// uint64 -> *seenBlocks
blockHeightLRU, err := lru.New[uint64, *seenBlocks](1000)
blockHeightLRU, err := lru.New[uint64, *seenBlocks](1500)
if err != nil {
panic(fmt.Errorf("failed to set up block height LRU cache: %w", err))
}
Expand Down Expand Up @@ -296,8 +296,8 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti
// rounding down to seconds is fine here.
now := uint64(time.Now().Unix())

// [REJECT] if the `payload.timestamp` is older than 60 seconds in the past
if uint64(payload.Timestamp) < now-60 {
// [REJECT] if the `payload.timestamp` is older than 20 min in the past
if uint64(payload.Timestamp) < now-1200 {
log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp))
return pubsub.ValidationReject
}
Expand Down
2 changes: 2 additions & 0 deletions op-node/rollup/derive/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ type L1ReceiptsFetcher interface {
InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error)
InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error)
GoOrUpdatePreFetchReceipts(ctx context.Context, l1StartBlock uint64) error
ClearReceiptsCacheBefore(blockNumber uint64)
}

type SystemConfigL2Fetcher interface {
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/derive/engine_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func (eq *EngineQueue) postProcessSafeL2() {
eq.log.Debug("updated finality-data", "last_l1", last.L1Block, "last_l2", last.L2Block)
}
}
eq.l1Fetcher.ClearReceiptsCacheBefore(eq.safeHead.L1Origin.Number)
}

func (eq *EngineQueue) logSyncProgress(reason string) {
Expand Down Expand Up @@ -794,7 +795,7 @@ func (eq *EngineQueue) Reset(ctx context.Context, _ eth.L1BlockRef, _ eth.System
if err != nil {
return NewTemporaryError(fmt.Errorf("failed to fetch L1 config of L2 block %s: %w", pipelineL2.ID(), err))
}
err2 := eq.l1Fetcher.GoOrUpdatePreFetchReceipts(ctx, pipelineOrigin.Number)
err2 := eq.l1Fetcher.GoOrUpdatePreFetchReceipts(context.Background(), pipelineOrigin.Number)
if err2 != nil {
return NewTemporaryError(fmt.Errorf("failed to run pre fetch L1 receipts for L1 start block %s: %w", pipelineOrigin.ID(), err2))
}
Expand Down
17 changes: 17 additions & 0 deletions op-node/rollup/derive/engine_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,14 @@ func TestEngineQueue_Finalize(t *testing.T) {
eq.origin = refD
prev.origin = refD
eq.safeHead = refC1
l1F.ExpectClearReceiptsCacheBefore(refC1.L1Origin.Number)
eq.postProcessSafeL2()

// now say D0 was included in E and became the new safe head
eq.origin = refE
prev.origin = refE
eq.safeHead = refD0
l1F.ExpectClearReceiptsCacheBefore(refD0.L1Origin.Number)
eq.postProcessSafeL2()

// let's finalize D (current L1), from which we fully derived C1 (it was safe head), but not D0 (included in E)
Expand Down Expand Up @@ -703,16 +705,19 @@ func TestVerifyNewL1Origin(t *testing.T) {
newOrigin eth.L1BlockRef
expectReset bool
expectedFetchBlocks map[uint64]eth.L1BlockRef
verifyPass bool
}{
{
name: "L1OriginBeforeUnsafeOrigin",
newOrigin: refD,
expectReset: false,
verifyPass: true,
},
{
name: "Matching",
newOrigin: refF,
expectReset: false,
verifyPass: true,
},
{
name: "BlockNumberEqualDifferentHash",
Expand All @@ -723,11 +728,13 @@ func TestVerifyNewL1Origin(t *testing.T) {
Time: refF.Time,
},
expectReset: true,
verifyPass: false,
},
{
name: "UnsafeIsParent",
newOrigin: refG,
expectReset: false,
verifyPass: true,
},
{
name: "UnsafeIsParentNumberDifferentHash",
Expand All @@ -738,6 +745,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
Time: refG.Time,
},
expectReset: true,
verifyPass: false,
},
{
name: "UnsafeIsOlderCanonical",
Expand All @@ -746,6 +754,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
expectedFetchBlocks: map[uint64]eth.L1BlockRef{
refF.Number: refF,
},
verifyPass: true,
},
{
name: "UnsafeIsOlderNonCanonical",
Expand All @@ -765,6 +774,7 @@ func TestVerifyNewL1Origin(t *testing.T) {
Time: refF.Time,
},
},
verifyPass: false,
},
}
for _, test := range tests {
Expand Down Expand Up @@ -839,6 +849,9 @@ func TestVerifyNewL1Origin(t *testing.T) {
// L1 chain reorgs so new origin is at same slot as refF but on a different fork
prev.origin = test.newOrigin
eq.UnsafeL2Head()
if test.verifyPass {
l1F.ExpectClearReceiptsCacheBefore(refB.Number)
}
err = eq.Step(context.Background())
if test.expectReset {
require.ErrorIs(t, err, ErrReset, "should reset pipeline due to mismatched origin")
Expand Down Expand Up @@ -938,6 +951,7 @@ func TestBlockBuildingRace(t *testing.T) {

// Expect initial forkchoice update
eng.ExpectForkchoiceUpdate(preFc, nil, preFcRes, nil)
l1F.ExpectClearReceiptsCacheBefore(refA.Number)
require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset")

// Expect initial building update, to process the attributes we queued up
Expand Down Expand Up @@ -1005,6 +1019,7 @@ func TestBlockBuildingRace(t *testing.T) {
}
eng.ExpectForkchoiceUpdate(postFc, nil, postFcRes, nil)

l1F.ExpectClearReceiptsCacheBefore(refA.Number)
// Now complete the job, as external user of the engine
_, _, err = eq.ConfirmPayload(context.Background())
require.NoError(t, err)
Expand Down Expand Up @@ -1093,6 +1108,7 @@ func TestResetLoop(t *testing.T) {
eq.engineSyncTarget = refA2
eq.safeHead = refA1
eq.finalized = refA0
l1F.ExpectClearReceiptsCacheBefore(refA.Number)

// Qeueue up the safe attributes
require.Nil(t, eq.safeAttributes)
Expand All @@ -1111,6 +1127,7 @@ func TestResetLoop(t *testing.T) {
eng.ExpectForkchoiceUpdate(preFc, nil, nil, nil)
require.NoError(t, eq.Step(context.Background()), "clean forkchoice state after reset")

l1F.ExpectClearReceiptsCacheBefore(refA.Number)
// Crux of the test. Should be in a valid state after the reset.
require.ErrorIs(t, eq.Step(context.Background()), NotEnoughData, "Should be able to step after a reset")

Expand Down
1 change: 1 addition & 0 deletions op-node/rollup/derive/l1_traversal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
type L1BlockRefByNumberFetcher interface {
L1BlockRefByNumber(context.Context, uint64) (eth.L1BlockRef, error)
FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error)
PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error)
}

type L1Traversal struct {
Expand Down
10 changes: 10 additions & 0 deletions op-node/rollup/driver/metered_l1fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (m *MeteredL1Fetcher) FetchReceipts(ctx context.Context, blockHash common.H
return m.inner.FetchReceipts(ctx, blockHash)
}

func (m *MeteredL1Fetcher) PreFetchReceipts(ctx context.Context, blockHash common.Hash) (bool, error) {
defer m.recordTime("PreFetchReceipts")()
return m.inner.PreFetchReceipts(ctx, blockHash)
}

var _ derive.L1Fetcher = (*MeteredL1Fetcher)(nil)

func (m *MeteredL1Fetcher) recordTime(method string) func() {
Expand All @@ -71,3 +76,8 @@ func (m *MeteredL1Fetcher) GoOrUpdatePreFetchReceipts(ctx context.Context, l1Sta
defer m.recordTime("GoOrUpdatePreFetchReceipts")()
return m.inner.GoOrUpdatePreFetchReceipts(ctx, l1StartBlock)
}

func (m *MeteredL1Fetcher) ClearReceiptsCacheBefore(blockNumber uint64) {
defer m.recordTime("ClearReceiptsCacheBefore")()
m.inner.ClearReceiptsCacheBefore(blockNumber)
}
1 change: 1 addition & 0 deletions op-node/rollup/driver/origin_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (los *L1OriginSelector) FindL1Origin(ctx context.Context, l2Head eth.L2Bloc
_, _, err = los.l1.FetchReceipts(receiptsCtx, nextOrigin.Hash)
if err != nil {
receiptsCached = false
log.Warn("Fetch receipts cache missed when sequencer building block")
}

// If the next L2 block time is greater than the next origin block's time, we can choose to
Expand Down
Loading

0 comments on commit 7c6c489

Please sign in to comment.