Skip to content

Commit

Permalink
Merge pull request #84 from EspressoSystems/fix/decreasing-l1-block-n…
Browse files Browse the repository at this point in the history
…umber

Handle decreasing l1 block number
  • Loading branch information
jbearer authored Oct 6, 2023
2 parents 57bd549 + 8b53652 commit 008843e
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 84 deletions.
135 changes: 79 additions & 56 deletions etherman/etherman.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ var (
"please check the [Etherman] PrivateKeyPath and PrivateKeyPassword configuration")
// ErrPrivateKeyNotFound used when the provided sender does not have a private key registered to be used
ErrPrivateKeyNotFound = errors.New("can't find sender private key to sign tx")
// ErrSkipBatch indicates that we have seen an old batch and should skip re-processing it
ErrSkipBatch = errors.New("skip old batch")
)

// SequencedBatchesSigHash returns the hash for the `SequenceBatches` event.
Expand Down Expand Up @@ -247,7 +249,7 @@ func (etherMan *Client) GetForks(ctx context.Context) ([]state.ForkIDInterval, e

// GetRollupInfoByBlockRange function retrieves the Rollup information that are included in all this ethereum blocks
// from block x to block y.
func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) {
func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64, prevBatch state.L2BatchInfo, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) {
// Filter query
query := ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(fromBlock),
Expand All @@ -256,7 +258,7 @@ func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock
if toBlock != nil {
query.ToBlock = new(big.Int).SetUint64(*toBlock)
}
blocks, blocksOrder, err := etherMan.readEvents(ctx, query, usePreconfirmations)
blocks, blocksOrder, err := etherMan.readEvents(ctx, prevBatch, query, usePreconfirmations)
if err != nil {
return nil, nil, err
}
Expand All @@ -269,15 +271,15 @@ type Order struct {
Pos int
}

func (etherMan *Client) readEvents(ctx context.Context, query ethereum.FilterQuery, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) {
func (etherMan *Client) readEvents(ctx context.Context, prevBatch state.L2BatchInfo, query ethereum.FilterQuery, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) {
logs, err := etherMan.EthClient.FilterLogs(ctx, query)
if err != nil {
return nil, nil, err
}
var blocks []Block
blocksOrder := make(map[common.Hash][]Order)
for _, vLog := range logs {
err := etherMan.processEvent(ctx, vLog, &blocks, &blocksOrder, usePreconfirmations)
err := etherMan.processEvent(ctx, &prevBatch, vLog, &blocks, &blocksOrder, usePreconfirmations)
if err != nil {
log.Warnf("error processing event. Retrying... Error: %s. vLog: %+v", err.Error(), vLog)
return nil, nil, err
Expand All @@ -286,7 +288,7 @@ func (etherMan *Client) readEvents(ctx context.Context, query ethereum.FilterQue
return blocks, blocksOrder, nil
}

func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order, usePreconfirmations bool) error {
func (etherMan *Client) processEvent(ctx context.Context, prevBatch *state.L2BatchInfo, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order, usePreconfirmations bool) error {
switch vLog.Topics[0] {
case newBlocksSignatureHash:
if usePreconfirmations {
Expand All @@ -295,7 +297,7 @@ func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks
// L1 (which happens later).
return nil
} else {
return etherMan.newBlocksEvent(ctx, vLog, blocks, blocksOrder)
return etherMan.newBlocksEvent(ctx, prevBatch, vLog, blocks, blocksOrder)
}
case updateGlobalExitRootSignatureHash:
return etherMan.updateGlobalExitRootEvent(ctx, vLog, blocks, blocksOrder)
Expand Down Expand Up @@ -498,7 +500,7 @@ func (etherMan *Client) getMaxPreconfirmation() (uint64, error) {
return blockHeight, nil
}

func (etherMan *Client) GetPreconfirmations(ctx context.Context, fromL2Batch uint64) ([]Block, map[common.Hash][]Order, error) {
func (etherMan *Client) GetPreconfirmations(ctx context.Context, prevBatch state.L2BatchInfo) ([]Block, map[common.Hash][]Order, error) {
hotShotBlockHeight, err := etherMan.getMaxPreconfirmation()
if err != nil {
return nil, nil, err
Expand All @@ -507,18 +509,20 @@ func (etherMan *Client) GetPreconfirmations(ctx context.Context, fromL2Batch uin
var blocks []Block
order := make(map[common.Hash][]Order)

fromHotShotBlock := fromL2Batch + etherMan.cfg.GenesisHotShotBlockNumber
// Start fetching from the next L2 batch (prevBatch.Number + 1), adjusting batch numbers to
// HotShot block numbers by offsetting by the HotShot block height at L2 genesis time.
fromHotShotBlock := prevBatch.Number + 1 + etherMan.cfg.GenesisHotShotBlockNumber
log.Infof("Getting HotShot blocks in range %d - %d", fromHotShotBlock, hotShotBlockHeight)
for hotShotBlockNum := fromHotShotBlock; hotShotBlockNum < hotShotBlockHeight; hotShotBlockNum++ {
var batch SequencedBatch
var l1BlockNum uint64

err = etherMan.fetchL2Block(ctx, hotShotBlockNum, &batch, &l1BlockNum)
if err != nil {
err = etherMan.fetchL2Block(ctx, hotShotBlockNum, &prevBatch, &batch)
if errors.Is(err, ErrSkipBatch) {
continue
} else if err != nil {
return nil, nil, err
}

err = etherMan.appendSequencedBatches(ctx, []SequencedBatch{batch}, l1BlockNum, &blocks, &order)
err = etherMan.appendSequencedBatches(ctx, []SequencedBatch{batch}, batch.BlockNumber, &blocks, &order)
if err != nil {
return nil, nil, err
}
Expand All @@ -527,7 +531,7 @@ func (etherMan *Client) GetPreconfirmations(ctx context.Context, fromL2Batch uin
return blocks, order, nil
}

func (etherMan *Client) newBlocksEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
func (etherMan *Client) newBlocksEvent(ctx context.Context, prevBatch *state.L2BatchInfo, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
newBlocks, err := etherMan.HotShot.ParseNewBlocks(vLog)
if err != nil {
return err
Expand All @@ -545,7 +549,7 @@ func (etherMan *Client) newBlocksEvent(ctx context.Context, vLog types.Log, bloc
if err != nil {
return err
}
sequences, err := etherMan.decodeSequencesHotShot(ctx, tx.Data(), *newBlocks, msg.From, msg.Nonce)
sequences, err := etherMan.decodeSequencesHotShot(ctx, prevBatch, tx.Data(), *newBlocks, msg.From, msg.Nonce)
if err != nil {
return fmt.Errorf("error decoding the sequences: %v", err)
}
Expand All @@ -558,18 +562,20 @@ func (etherMan *Client) newBlocksEvent(ctx context.Context, vLog types.Log, bloc

func (etherMan *Client) appendSequencedBatches(ctx context.Context, sequences []SequencedBatch, blockNumber uint64, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error {
if len(*blocks) == 0 || (*blocks)[len(*blocks)-1].BlockNumber != blockNumber {
// Sanity check: if we got a new L1 block number, it should be increasing.
if len(*blocks) > 0 && blockNumber < (*blocks)[len(*blocks)-1].BlockNumber {
log.Fatalf("L1 block number decreased from %d to %d", (*blocks)[len(*blocks)-1].BlockNumber, blockNumber)
}

fullBlock, err := etherMan.EthBlockByNumber(ctx, blockNumber)
if err != nil {
return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %w", blockNumber, err)
}
block := prepareBlock(fullBlock)
block.SequencedBatches = append(block.SequencedBatches, sequences)
*blocks = append(*blocks, block)
} else if (*blocks)[len(*blocks)-1].BlockNumber == blockNumber {
(*blocks)[len(*blocks)-1].SequencedBatches = append((*blocks)[len(*blocks)-1].SequencedBatches, sequences)
} else {
log.Error("Error processing SequencedBatches event. BlockNumber: ", blockNumber)
return fmt.Errorf("error processing SequencedBatches event")
(*blocks)[len(*blocks)-1].SequencedBatches = append((*blocks)[len(*blocks)-1].SequencedBatches, sequences)
}
or := Order{
Name: SequenceBatchesOrder,
Expand All @@ -579,25 +585,7 @@ func (etherMan *Client) appendSequencedBatches(ctx context.Context, sequences []
return nil
}

func (etherMan *Client) l1BlockFromL2Block(ctx context.Context, l2Block SequencerBlock) (*types.Block, error) {
// Each L2 block must be associated with a unique L1 block, so that we know which global exit
// root (maintained on L1) to use when executing bridge withdrawals on L2. In the original
// Polygon zkEVM, this association is determined whenever an L2 batch is sequenced on L1. This
// design makes it impossible to use the HotShot sequencer for fast preconfirmations, because
// even though a canonical ordering of L2 blocks is determined quickly, we cannot execute those
// blocks until they have been persisted on L1, which can be slow.
//
// To enable fast preconfirmations, we redefine the way in which L2 blocks get associated with
// L1 blocks. Each time an L2 block is _sequenced_, the HotShot consensus protocol assigns it an
// L1 block number, which is guaranteed to be a recent L1 block number by a quorum of the stake.
// This means each L2 block is *immediately* associated with an L1 block in a determinstic and
// unequivocal way. We use this association when executing the block and later when proving it,
// so there is no need to wait for the block to be sent to L1 in order to compute the resulting
// state.
return etherMan.EthBlockByNumber(ctx, l2Block.L1Block)
}

func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, txData []byte, newBlocks ihotshot.IhotshotNewBlocks, sequencer common.Address, nonce uint64) ([]SequencedBatch, error) {
func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, prevBatch *state.L2BatchInfo, txData []byte, newBlocks ihotshot.IhotshotNewBlocks, sequencer common.Address, nonce uint64) ([]SequencedBatch, error) {

// Get number of batches by parsing transaction
numNewBatches := newBlocks.NumBlocks.Uint64()
Expand All @@ -616,8 +604,10 @@ func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, txData []byt
continue
}
newBatch := SequencedBatch{}
err := etherMan.fetchL2Block(ctx, curHotShotBlockNum, &newBatch, nil)
if err != nil {
err := etherMan.fetchL2Block(ctx, curHotShotBlockNum, prevBatch, &newBatch)
if errors.Is(err, ErrSkipBatch) {
continue
} else if err != nil {
return nil, err
}
sequencedBatches = append(sequencedBatches, newBatch)
Expand All @@ -626,7 +616,7 @@ func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, txData []byt
return sequencedBatches, nil
}

func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64, batch *SequencedBatch, l1BlockNum *uint64) error {
func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64, prevBatch *state.L2BatchInfo, batch *SequencedBatch) error {
// Get transactions and metadata from HotShot query service
hotShotBlockNumStr := strconv.FormatUint(hotShotBlockNum, 10)

Expand Down Expand Up @@ -678,6 +668,50 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64
panic(err)
}

batchNum := hotShotBlockNum - etherMan.cfg.GenesisHotShotBlockNumber
log.Infof("Creating batch number %d", batchNum)

// Check that we got the expected batch.
if batchNum < prevBatch.Number + 1 {
// We got a batch which is older than expected. This means we have somehow already processed
// this batch. This should not be possible, since each batch is included exactly once in the
// batch stream, and the only case where we process the same section of the batch stream
// twice is when retrying after an error, in which case we should have reverted any changes
// we made to the state when processing the first time.
//
// If this happens, it is indicative of a programming error or a corrupt block stream, so we
// will complain loudly. However, there is no actual harm done: since we have ostensibly
// already processed this batch, we can just skip it and continue to make progress.
log.Errorf("received old batch %d, prev batch is %v", batchNum, prevBatch)
return ErrSkipBatch
} else if batchNum > prevBatch.Number + 1 {
// In this case we have somehow skipped a batch. This should also not be possible, because
// we always process batches sequentially. This is indicative of a corrupt DB. All we can do
// is return an error to trigger a retry in the caller.
return fmt.Errorf("received batch %d from the future, prev batch is %v", batchNum, prevBatch)
}

// Adjust L1 block and timestamp as needed.
//
// This should not be necessary, since HotShot should enforce non-decreasing timestamps and L1
// block numbers. However, since HotShot does not currently support the ValidatedState API,
// timestamps and L1 block numbers proposed by leaders are not checked by replicas, and may
// occasionally decrease. In this case, just use the previous value, to avoid breaking the rest
// of the execution pipeline.
if l2Block.L1Block < prevBatch.L1Block {
log.Warnf("HotShot block %d has decreasing L1Block: %d-%d", l2Block.Height, prevBatch.L1Block, l2Block.L1Block)
l2Block.L1Block = prevBatch.L1Block
}
if l2Block.Timestamp < prevBatch.Timestamp {
log.Warnf("HotShot block %d has decreasing timestamp: %d-%d", l2Block.Height, prevBatch.Timestamp, l2Block.Timestamp)
l2Block.Timestamp = prevBatch.Timestamp
}
*prevBatch = state.L2BatchInfo{
Number: batchNum,
L1Block: l2Block.L1Block,
Timestamp: l2Block.Timestamp,
}

log.Infof(
"Fetched L1 block %d, hotshot block: %d, timestamp %v, transactions %v",
l2Block.L1Block,
Expand All @@ -691,14 +725,8 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64
panic(err)
}

// Fetch L1 state corresponding to this L2 block (e.g. global exit root)
l1Block, err := etherMan.l1BlockFromL2Block(ctx, l2Block)
if err != nil {
return err
}

var ger [32]byte
code, err := etherMan.EthClient.CodeAt(ctx, etherMan.cfg.GlobalExitRootManagerAddr, l1Block.Number())
code, err := etherMan.EthClient.CodeAt(ctx, etherMan.cfg.GlobalExitRootManagerAddr, big.NewInt(int64(l2Block.L1Block)))
if err != nil {
return err
}
Expand All @@ -708,10 +736,10 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64
// should not contain any transactions for this L2, since they were created before the L2
// was deployed. In this case it doesn't matter what global exit root we use.
if len(txns) != 0 {
return fmt.Errorf("block %v (L1 block %v) contains L2 transactions from before GlobalExitRootManager was deployed", hotShotBlockNum, l1Block.Number())
return fmt.Errorf("block %v (L1 block %v) contains L2 transactions from before GlobalExitRootManager was deployed", hotShotBlockNum, l2Block.L1Block)
}
} else {
ger, err = etherMan.GlobalExitRootManager.GetLastGlobalExitRoot(&bind.CallOpts{BlockNumber: l1Block.Number()})
ger, err = etherMan.GlobalExitRootManager.GetLastGlobalExitRoot(&bind.CallOpts{BlockNumber: big.NewInt(int64(l2Block.L1Block))})
if err != nil {
return err
}
Expand All @@ -723,11 +751,9 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64
Timestamp: l2Block.Timestamp,
}

batchNum := hotShotBlockNum - etherMan.cfg.GenesisHotShotBlockNumber
log.Infof("Creating batch number %d", batchNum)

*batch = SequencedBatch{
BatchNumber: batchNum,
BlockNumber: l2Block.L1Block,
PolygonZkEVMBatchData: newBatchData, // BatchData info

// Some metadata (in particular: information about the L1 transaction which sequenced this
Expand All @@ -740,9 +766,6 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64
Coinbase: common.Address{},
Nonce: 0,
}
if l1BlockNum != nil {
*l1BlockNum = l1Block.Number().Uint64()
}

return nil
}
Expand Down
1 change: 1 addition & 0 deletions etherman/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type GlobalExitRoot struct {
// SequencedBatch represents virtual batch
type SequencedBatch struct {
BatchNumber uint64
BlockNumber uint64
SequencerAddr common.Address
TxHash common.Hash
Nonce uint64
Expand Down
31 changes: 31 additions & 0 deletions state/pgstatestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
addBlockSQL = "INSERT INTO state.block (block_num, block_hash, parent_hash, received_at) SELECT $1, $2, $3, $4 WHERE NOT EXISTS (SELECT block_num FROM state.block WHERE block_num = $1)"
getLastBlockSQL = "SELECT block_num, block_hash, parent_hash, received_at FROM state.block ORDER BY block_num DESC LIMIT 1"
getPreviousBlockSQL = "SELECT block_num, block_hash, parent_hash, received_at FROM state.block ORDER BY block_num DESC LIMIT 1 OFFSET $1"
getLastBatchInfoSQL = "SELECT v.batch_num, v.block_num, b.timestamp FROM state.batch AS b JOIN state.virtual_batch AS v ON b.batch_num = v.batch_num ORDER BY b.batch_num DESC LIMIT 1"
getLastBatchNumberSQL = "SELECT batch_num FROM state.batch ORDER BY batch_num DESC LIMIT 1"
getLastNBatchesSQL = "SELECT batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num from state.batch ORDER BY batch_num DESC LIMIT $1"
getLastBatchTimeSQL = "SELECT timestamp FROM state.batch ORDER BY batch_num DESC LIMIT 1"
Expand Down Expand Up @@ -492,6 +493,22 @@ func (p *PostgresStorage) GetLastNBatchesByL2BlockNumber(ctx context.Context, l2
return batches, *l2BlockStateRoot, nil
}

// GetLastBatchInfo get last trusted batch info
func (p *PostgresStorage) GetLastBatchInfo(ctx context.Context, dbTx pgx.Tx) (L2BatchInfo, error) {
var info L2BatchInfo
var timestamp time.Time

q := p.getExecQuerier(dbTx)

err := q.QueryRow(ctx, getLastBatchInfoSQL).Scan(&info.Number, &info.L1Block, &timestamp)
if errors.Is(err, pgx.ErrNoRows) {
return info, ErrStateNotSynchronized
}
info.Timestamp = uint64(timestamp.Unix())

return info, err
}

// GetLastBatchNumber get last trusted batch number
func (p *PostgresStorage) GetLastBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error) {
var batchNumber uint64
Expand Down Expand Up @@ -556,6 +573,20 @@ func (p *PostgresStorage) SetLastBatchNumberSeenOnEthereum(ctx context.Context,
return err
}

func (p *PostgresStorage) ContainsBlock(ctx context.Context, blockNum uint64, dbTx pgx.Tx) (bool, error) {
const getBlockByNumberSQL = `
SELECT count(*)
FROM state.block
WHERE block_num = $1`
var count uint64

e := p.getExecQuerier(dbTx)
if err := e.QueryRow(ctx, getBlockByNumberSQL, blockNum).Scan(&count); err != nil {
return false, err
}
return count > 0, nil
}

// GetLastBatchNumberSeenOnEthereum returns the last batch number stored
// in the state that represents the last batch number that affected the
// roll-up in the Ethereum network.
Expand Down
6 changes: 6 additions & 0 deletions state/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,9 @@ type DebugInfo struct {
Timestamp time.Time
Payload string
}

type L2BatchInfo struct {
Number uint64
L1Block uint64
Timestamp uint64
}
Loading

0 comments on commit 008843e

Please sign in to comment.