diff --git a/etherman/etherman.go b/etherman/etherman.go index 7e0bc08..e240dbf 100644 --- a/etherman/etherman.go +++ b/etherman/etherman.go @@ -63,6 +63,8 @@ var ( "please check the [Etherman] PrivateKeyPath and PrivateKeyPassword configuration") // ErrPrivateKeyNotFound used when the provided sender does not have a private key registered to be used ErrPrivateKeyNotFound = errors.New("can't find sender private key to sign tx") + // ErrSkipBatch indicates that we have seen an old batch and should skip re-processing it + ErrSkipBatch = errors.New("skip old batch") ) // SequencedBatchesSigHash returns the hash for the `SequenceBatches` event. @@ -247,7 +249,7 @@ func (etherMan *Client) GetForks(ctx context.Context) ([]state.ForkIDInterval, e // GetRollupInfoByBlockRange function retrieves the Rollup information that are included in all this ethereum blocks // from block x to block y. -func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) { +func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64, prevBatch state.L2BatchInfo, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) { // Filter query query := ethereum.FilterQuery{ FromBlock: new(big.Int).SetUint64(fromBlock), @@ -256,7 +258,7 @@ func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock if toBlock != nil { query.ToBlock = new(big.Int).SetUint64(*toBlock) } - blocks, blocksOrder, err := etherMan.readEvents(ctx, query, usePreconfirmations) + blocks, blocksOrder, err := etherMan.readEvents(ctx, prevBatch, query, usePreconfirmations) if err != nil { return nil, nil, err } @@ -269,7 +271,7 @@ type Order struct { Pos int } -func (etherMan *Client) readEvents(ctx context.Context, query ethereum.FilterQuery, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) { +func (etherMan *Client) readEvents(ctx context.Context, prevBatch state.L2BatchInfo, query ethereum.FilterQuery, usePreconfirmations bool) ([]Block, map[common.Hash][]Order, error) { logs, err := etherMan.EthClient.FilterLogs(ctx, query) if err != nil { return nil, nil, err @@ -277,7 +279,7 @@ func (etherMan *Client) readEvents(ctx context.Context, query ethereum.FilterQue var blocks []Block blocksOrder := make(map[common.Hash][]Order) for _, vLog := range logs { - err := etherMan.processEvent(ctx, vLog, &blocks, &blocksOrder, usePreconfirmations) + err := etherMan.processEvent(ctx, &prevBatch, vLog, &blocks, &blocksOrder, usePreconfirmations) if err != nil { log.Warnf("error processing event. Retrying... Error: %s. vLog: %+v", err.Error(), vLog) return nil, nil, err @@ -286,7 +288,7 @@ func (etherMan *Client) readEvents(ctx context.Context, query ethereum.FilterQue return blocks, blocksOrder, nil } -func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order, usePreconfirmations bool) error { +func (etherMan *Client) processEvent(ctx context.Context, prevBatch *state.L2BatchInfo, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order, usePreconfirmations bool) error { switch vLog.Topics[0] { case newBlocksSignatureHash: if usePreconfirmations { @@ -295,7 +297,7 @@ func (etherMan *Client) processEvent(ctx context.Context, vLog types.Log, blocks // L1 (which happens later). return nil } else { - return etherMan.newBlocksEvent(ctx, vLog, blocks, blocksOrder) + return etherMan.newBlocksEvent(ctx, prevBatch, vLog, blocks, blocksOrder) } case updateGlobalExitRootSignatureHash: return etherMan.updateGlobalExitRootEvent(ctx, vLog, blocks, blocksOrder) @@ -498,7 +500,7 @@ func (etherMan *Client) getMaxPreconfirmation() (uint64, error) { return blockHeight, nil } -func (etherMan *Client) GetPreconfirmations(ctx context.Context, fromL2Batch uint64) ([]Block, map[common.Hash][]Order, error) { +func (etherMan *Client) GetPreconfirmations(ctx context.Context, prevBatch state.L2BatchInfo) ([]Block, map[common.Hash][]Order, error) { hotShotBlockHeight, err := etherMan.getMaxPreconfirmation() if err != nil { return nil, nil, err @@ -507,18 +509,20 @@ func (etherMan *Client) GetPreconfirmations(ctx context.Context, fromL2Batch uin var blocks []Block order := make(map[common.Hash][]Order) - fromHotShotBlock := fromL2Batch + etherMan.cfg.GenesisHotShotBlockNumber + // Start fetching from the next L2 batch (prevBatch.Number + 1), adjusting batch numbers to + // HotShot block numbers by offsetting by the HotShot block height at L2 genesis time. + fromHotShotBlock := prevBatch.Number + 1 + etherMan.cfg.GenesisHotShotBlockNumber log.Infof("Getting HotShot blocks in range %d - %d", fromHotShotBlock, hotShotBlockHeight) for hotShotBlockNum := fromHotShotBlock; hotShotBlockNum < hotShotBlockHeight; hotShotBlockNum++ { var batch SequencedBatch - var l1BlockNum uint64 - - err = etherMan.fetchL2Block(ctx, hotShotBlockNum, &batch, &l1BlockNum) - if err != nil { + err = etherMan.fetchL2Block(ctx, hotShotBlockNum, &prevBatch, &batch) + if errors.Is(err, ErrSkipBatch) { + continue + } else if err != nil { return nil, nil, err } - err = etherMan.appendSequencedBatches(ctx, []SequencedBatch{batch}, l1BlockNum, &blocks, &order) + err = etherMan.appendSequencedBatches(ctx, []SequencedBatch{batch}, batch.BlockNumber, &blocks, &order) if err != nil { return nil, nil, err } @@ -527,7 +531,7 @@ func (etherMan *Client) GetPreconfirmations(ctx context.Context, fromL2Batch uin return blocks, order, nil } -func (etherMan *Client) newBlocksEvent(ctx context.Context, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error { +func (etherMan *Client) newBlocksEvent(ctx context.Context, prevBatch *state.L2BatchInfo, vLog types.Log, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error { newBlocks, err := etherMan.HotShot.ParseNewBlocks(vLog) if err != nil { return err @@ -545,7 +549,7 @@ func (etherMan *Client) newBlocksEvent(ctx context.Context, vLog types.Log, bloc if err != nil { return err } - sequences, err := etherMan.decodeSequencesHotShot(ctx, tx.Data(), *newBlocks, msg.From, msg.Nonce) + sequences, err := etherMan.decodeSequencesHotShot(ctx, prevBatch, tx.Data(), *newBlocks, msg.From, msg.Nonce) if err != nil { return fmt.Errorf("error decoding the sequences: %v", err) } @@ -558,6 +562,11 @@ func (etherMan *Client) newBlocksEvent(ctx context.Context, vLog types.Log, bloc func (etherMan *Client) appendSequencedBatches(ctx context.Context, sequences []SequencedBatch, blockNumber uint64, blocks *[]Block, blocksOrder *map[common.Hash][]Order) error { if len(*blocks) == 0 || (*blocks)[len(*blocks)-1].BlockNumber != blockNumber { + // Sanity check: if we got a new L1 block number, it should be increasing. + if len(*blocks) > 0 && blockNumber < (*blocks)[len(*blocks)-1].BlockNumber { + log.Fatalf("L1 block number decreased from %d to %d", (*blocks)[len(*blocks)-1].BlockNumber, blockNumber) + } + fullBlock, err := etherMan.EthBlockByNumber(ctx, blockNumber) if err != nil { return fmt.Errorf("error getting hashParent. BlockNumber: %d. Error: %w", blockNumber, err) @@ -565,11 +574,8 @@ func (etherMan *Client) appendSequencedBatches(ctx context.Context, sequences [] block := prepareBlock(fullBlock) block.SequencedBatches = append(block.SequencedBatches, sequences) *blocks = append(*blocks, block) - } else if (*blocks)[len(*blocks)-1].BlockNumber == blockNumber { - (*blocks)[len(*blocks)-1].SequencedBatches = append((*blocks)[len(*blocks)-1].SequencedBatches, sequences) } else { - log.Error("Error processing SequencedBatches event. BlockNumber: ", blockNumber) - return fmt.Errorf("error processing SequencedBatches event") + (*blocks)[len(*blocks)-1].SequencedBatches = append((*blocks)[len(*blocks)-1].SequencedBatches, sequences) } or := Order{ Name: SequenceBatchesOrder, @@ -579,25 +585,7 @@ func (etherMan *Client) appendSequencedBatches(ctx context.Context, sequences [] return nil } -func (etherMan *Client) l1BlockFromL2Block(ctx context.Context, l2Block SequencerBlock) (*types.Block, error) { - // Each L2 block must be associated with a unique L1 block, so that we know which global exit - // root (maintained on L1) to use when executing bridge withdrawals on L2. In the original - // Polygon zkEVM, this association is determined whenever an L2 batch is sequenced on L1. This - // design makes it impossible to use the HotShot sequencer for fast preconfirmations, because - // even though a canonical ordering of L2 blocks is determined quickly, we cannot execute those - // blocks until they have been persisted on L1, which can be slow. - // - // To enable fast preconfirmations, we redefine the way in which L2 blocks get associated with - // L1 blocks. Each time an L2 block is _sequenced_, the HotShot consensus protocol assigns it an - // L1 block number, which is guaranteed to be a recent L1 block number by a quorum of the stake. - // This means each L2 block is *immediately* associated with an L1 block in a determinstic and - // unequivocal way. We use this association when executing the block and later when proving it, - // so there is no need to wait for the block to be sent to L1 in order to compute the resulting - // state. - return etherMan.EthBlockByNumber(ctx, l2Block.L1Block) -} - -func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, txData []byte, newBlocks ihotshot.IhotshotNewBlocks, sequencer common.Address, nonce uint64) ([]SequencedBatch, error) { +func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, prevBatch *state.L2BatchInfo, txData []byte, newBlocks ihotshot.IhotshotNewBlocks, sequencer common.Address, nonce uint64) ([]SequencedBatch, error) { // Get number of batches by parsing transaction numNewBatches := newBlocks.NumBlocks.Uint64() @@ -616,8 +604,10 @@ func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, txData []byt continue } newBatch := SequencedBatch{} - err := etherMan.fetchL2Block(ctx, curHotShotBlockNum, &newBatch, nil) - if err != nil { + err := etherMan.fetchL2Block(ctx, curHotShotBlockNum, prevBatch, &newBatch) + if errors.Is(err, ErrSkipBatch) { + continue + } else if err != nil { return nil, err } sequencedBatches = append(sequencedBatches, newBatch) @@ -626,7 +616,7 @@ func (etherMan *Client) decodeSequencesHotShot(ctx context.Context, txData []byt return sequencedBatches, nil } -func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64, batch *SequencedBatch, l1BlockNum *uint64) error { +func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64, prevBatch *state.L2BatchInfo, batch *SequencedBatch) error { // Get transactions and metadata from HotShot query service hotShotBlockNumStr := strconv.FormatUint(hotShotBlockNum, 10) @@ -678,6 +668,50 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64 panic(err) } + batchNum := hotShotBlockNum - etherMan.cfg.GenesisHotShotBlockNumber + log.Infof("Creating batch number %d", batchNum) + + // Check that we got the expected batch. + if batchNum < prevBatch.Number + 1 { + // We got a batch which is older than expected. This means we have somehow already processed + // this batch. This should not be possible, since each batch is included exactly once in the + // batch stream, and the only case where we process the same section of the batch stream + // twice is when retrying after an error, in which case we should have reverted any changes + // we made to the state when processing the first time. + // + // If this happens, it is indicative of a programming error or a corrupt block stream, so we + // will complain loudly. However, there is no actual harm done: since we have ostensibly + // already processed this batch, we can just skip it and continue to make progress. + log.Errorf("received old batch %d, prev batch is %v", batchNum, prevBatch) + return ErrSkipBatch + } else if batchNum > prevBatch.Number + 1 { + // In this case we have somehow skipped a batch. This should also not be possible, because + // we always process batches sequentially. This is indicative of a corrupt DB. All we can do + // is return an error to trigger a retry in the caller. + return fmt.Errorf("received batch %d from the future, prev batch is %v", batchNum, prevBatch) + } + + // Adjust L1 block and timestamp as needed. + // + // This should not be necessary, since HotShot should enforce non-decreasing timestamps and L1 + // block numbers. However, since HotShot does not currently support the ValidatedState API, + // timestamps and L1 block numbers proposed by leaders are not checked by replicas, and may + // occasionally decrease. In this case, just use the previous value, to avoid breaking the rest + // of the execution pipeline. + if l2Block.L1Block < prevBatch.L1Block { + log.Warnf("HotShot block %d has decreasing L1Block: %d-%d", l2Block.Height, prevBatch.L1Block, l2Block.L1Block) + l2Block.L1Block = prevBatch.L1Block + } + if l2Block.Timestamp < prevBatch.Timestamp { + log.Warnf("HotShot block %d has decreasing timestamp: %d-%d", l2Block.Height, prevBatch.Timestamp, l2Block.Timestamp) + l2Block.Timestamp = prevBatch.Timestamp + } + *prevBatch = state.L2BatchInfo{ + Number: batchNum, + L1Block: l2Block.L1Block, + Timestamp: l2Block.Timestamp, + } + log.Infof( "Fetched L1 block %d, hotshot block: %d, timestamp %v, transactions %v", l2Block.L1Block, @@ -691,14 +725,8 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64 panic(err) } - // Fetch L1 state corresponding to this L2 block (e.g. global exit root) - l1Block, err := etherMan.l1BlockFromL2Block(ctx, l2Block) - if err != nil { - return err - } - var ger [32]byte - code, err := etherMan.EthClient.CodeAt(ctx, etherMan.cfg.GlobalExitRootManagerAddr, l1Block.Number()) + code, err := etherMan.EthClient.CodeAt(ctx, etherMan.cfg.GlobalExitRootManagerAddr, big.NewInt(int64(l2Block.L1Block))) if err != nil { return err } @@ -708,10 +736,10 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64 // should not contain any transactions for this L2, since they were created before the L2 // was deployed. In this case it doesn't matter what global exit root we use. if len(txns) != 0 { - return fmt.Errorf("block %v (L1 block %v) contains L2 transactions from before GlobalExitRootManager was deployed", hotShotBlockNum, l1Block.Number()) + return fmt.Errorf("block %v (L1 block %v) contains L2 transactions from before GlobalExitRootManager was deployed", hotShotBlockNum, l2Block.L1Block) } } else { - ger, err = etherMan.GlobalExitRootManager.GetLastGlobalExitRoot(&bind.CallOpts{BlockNumber: l1Block.Number()}) + ger, err = etherMan.GlobalExitRootManager.GetLastGlobalExitRoot(&bind.CallOpts{BlockNumber: big.NewInt(int64(l2Block.L1Block))}) if err != nil { return err } @@ -723,11 +751,9 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64 Timestamp: l2Block.Timestamp, } - batchNum := hotShotBlockNum - etherMan.cfg.GenesisHotShotBlockNumber - log.Infof("Creating batch number %d", batchNum) - *batch = SequencedBatch{ BatchNumber: batchNum, + BlockNumber: l2Block.L1Block, PolygonZkEVMBatchData: newBatchData, // BatchData info // Some metadata (in particular: information about the L1 transaction which sequenced this @@ -740,9 +766,6 @@ func (etherMan *Client) fetchL2Block(ctx context.Context, hotShotBlockNum uint64 Coinbase: common.Address{}, Nonce: 0, } - if l1BlockNum != nil { - *l1BlockNum = l1Block.Number().Uint64() - } return nil } diff --git a/etherman/types.go b/etherman/types.go index 070a43e..eefdfda 100644 --- a/etherman/types.go +++ b/etherman/types.go @@ -38,6 +38,7 @@ type GlobalExitRoot struct { // SequencedBatch represents virtual batch type SequencedBatch struct { BatchNumber uint64 + BlockNumber uint64 SequencerAddr common.Address TxHash common.Hash Nonce uint64 diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index 72b6d2c..66a75f8 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -27,6 +27,7 @@ const ( addBlockSQL = "INSERT INTO state.block (block_num, block_hash, parent_hash, received_at) SELECT $1, $2, $3, $4 WHERE NOT EXISTS (SELECT block_num FROM state.block WHERE block_num = $1)" getLastBlockSQL = "SELECT block_num, block_hash, parent_hash, received_at FROM state.block ORDER BY block_num DESC LIMIT 1" getPreviousBlockSQL = "SELECT block_num, block_hash, parent_hash, received_at FROM state.block ORDER BY block_num DESC LIMIT 1 OFFSET $1" + getLastBatchInfoSQL = "SELECT v.batch_num, v.block_num, b.timestamp FROM state.batch AS b JOIN state.virtual_batch AS v ON b.batch_num = v.batch_num ORDER BY b.batch_num DESC LIMIT 1" getLastBatchNumberSQL = "SELECT batch_num FROM state.batch ORDER BY batch_num DESC LIMIT 1" getLastNBatchesSQL = "SELECT batch_num, global_exit_root, local_exit_root, acc_input_hash, state_root, timestamp, coinbase, raw_txs_data, forced_batch_num from state.batch ORDER BY batch_num DESC LIMIT $1" getLastBatchTimeSQL = "SELECT timestamp FROM state.batch ORDER BY batch_num DESC LIMIT 1" @@ -492,6 +493,22 @@ func (p *PostgresStorage) GetLastNBatchesByL2BlockNumber(ctx context.Context, l2 return batches, *l2BlockStateRoot, nil } +// GetLastBatchInfo get last trusted batch info +func (p *PostgresStorage) GetLastBatchInfo(ctx context.Context, dbTx pgx.Tx) (L2BatchInfo, error) { + var info L2BatchInfo + var timestamp time.Time + + q := p.getExecQuerier(dbTx) + + err := q.QueryRow(ctx, getLastBatchInfoSQL).Scan(&info.Number, &info.L1Block, ×tamp) + if errors.Is(err, pgx.ErrNoRows) { + return info, ErrStateNotSynchronized + } + info.Timestamp = uint64(timestamp.Unix()) + + return info, err +} + // GetLastBatchNumber get last trusted batch number func (p *PostgresStorage) GetLastBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error) { var batchNumber uint64 @@ -556,6 +573,20 @@ func (p *PostgresStorage) SetLastBatchNumberSeenOnEthereum(ctx context.Context, return err } +func (p *PostgresStorage) ContainsBlock(ctx context.Context, blockNum uint64, dbTx pgx.Tx) (bool, error) { + const getBlockByNumberSQL = ` + SELECT count(*) + FROM state.block + WHERE block_num = $1` + var count uint64 + + e := p.getExecQuerier(dbTx) + if err := e.QueryRow(ctx, getBlockByNumberSQL, blockNum).Scan(&count); err != nil { + return false, err + } + return count > 0, nil +} + // GetLastBatchNumberSeenOnEthereum returns the last batch number stored // in the state that represents the last batch number that affected the // roll-up in the Ethereum network. diff --git a/state/types.go b/state/types.go index 39ffe97..b6a9a4c 100644 --- a/state/types.go +++ b/state/types.go @@ -152,3 +152,9 @@ type DebugInfo struct { Timestamp time.Time Payload string } + +type L2BatchInfo struct { + Number uint64 + L1Block uint64 + Timestamp uint64 +} diff --git a/synchronizer/interfaces.go b/synchronizer/interfaces.go index e3ce7d9..b3ac5bd 100644 --- a/synchronizer/interfaces.go +++ b/synchronizer/interfaces.go @@ -16,12 +16,12 @@ import ( // ethermanInterface contains the methods required to interact with ethereum. type ethermanInterface interface { HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) - GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64, usePreconfirmations bool) ([]etherman.Block, map[common.Hash][]etherman.Order, error) + GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64, prevBatch state.L2BatchInfo, usePreconfirmations bool) ([]etherman.Block, map[common.Hash][]etherman.Order, error) EthBlockByNumber(ctx context.Context, blockNumber uint64) (*types.Block, error) GetLatestBatchNumber() (uint64, error) GetTrustedSequencerURL() (string, error) VerifyGenBlockNumber(ctx context.Context, genBlockNumber uint64) (bool, error) - GetPreconfirmations(ctx context.Context, fromL2Block uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error) + GetPreconfirmations(ctx context.Context, prevBatch state.L2BatchInfo) ([]etherman.Block, map[common.Hash][]etherman.Order, error) } // stateInterface gathers the methods required to interact with the state. @@ -30,8 +30,10 @@ type stateInterface interface { AddGlobalExitRoot(ctx context.Context, exitRoot *state.GlobalExitRoot, dbTx pgx.Tx) error AddForcedBatch(ctx context.Context, forcedBatch *state.ForcedBatch, dbTx pgx.Tx) error AddBlock(ctx context.Context, block *state.Block, dbTx pgx.Tx) error + ContainsBlock(ctx context.Context, blockNum uint64, dbTx pgx.Tx) (bool, error) Reset(ctx context.Context, blockNumber uint64, dbTx pgx.Tx) error GetPreviousBlock(ctx context.Context, offset uint64, dbTx pgx.Tx) (*state.Block, error) + GetLastBatchInfo(ctx context.Context, dbTx pgx.Tx) (state.L2BatchInfo, error) GetLastBatchNumber(ctx context.Context, dbTx pgx.Tx) (uint64, error) GetLastBatchTime(ctx context.Context, dbTx pgx.Tx) (time.Time, error) GetBatchByNumber(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) (*state.Batch, error) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index a413146..195519d 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -215,12 +215,19 @@ func (s *ClientSynchronizer) syncBlocks(lastEthBlockSynced *state.Block) (*state toBlock := fromBlock + s.cfg.SyncChunkSize log.Infof("Syncing L1 block %d of %d", fromBlock, lastKnownBlock.Uint64()) log.Infof("Getting rollup info from L1 block %d to block %d", fromBlock, toBlock) + + prevBatch, err := s.state.GetLastBatchInfo(s.ctx, nil) + if err != nil { + log.Warn("error getting latest batch synced. Error: ", err) + return lastEthBlockSynced, err + } + // This function returns the rollup information contained in the ethereum blocks and an extra param called order. // Order param is a map that contains the event order to allow the synchronizer store the info in the same order that is readed. // Name can be defferent in the order struct. For instance: Batches or Name:NewSequencers. This name is an identifier to check // if the next info that must be stored in the db is a new sequencer or a batch. The value pos (position) tells what is the // array index where this value is. - blocks, order, err := s.etherMan.GetRollupInfoByBlockRange(s.ctx, fromBlock, &toBlock, s.usePreconfirmations()) + blocks, order, err := s.etherMan.GetRollupInfoByBlockRange(s.ctx, fromBlock, &toBlock, prevBatch, s.usePreconfirmations()) if err != nil { return lastEthBlockSynced, err } @@ -277,16 +284,15 @@ func (s *ClientSynchronizer) syncBlocks(lastEthBlockSynced *state.Block) (*state func (s *ClientSynchronizer) syncPreconfirmations() error { for { - // Figure out where to start from: what is the first L2 block we haven't synchronized yet? - // This is the first batch after the last synchronized batch number. - latestSyncedBatch, err := s.state.GetLastBatchNumber(s.ctx, nil) + // Figure out where to start from. + latestSyncedBatch, err := s.state.GetLastBatchInfo(s.ctx, nil) if err != nil { log.Warn("error getting latest batch synced. Error: ", err) return err } // Fetch new preconfirmed blocks from the sequencer. - blocks, order, err := s.etherMan.GetPreconfirmations(s.ctx, latestSyncedBatch+1) + blocks, order, err := s.etherMan.GetPreconfirmations(s.ctx, latestSyncedBatch) if err != nil { log.Warn("error getting preconfirmations. Error: ", err) return err @@ -425,7 +431,7 @@ func (s *ClientSynchronizer) processBlockRange(blocks []etherman.Block, order ma for _, element := range order[blocks[i].BlockHash] { switch element.Name { case etherman.SequenceBatchesOrder: - err = s.processSequenceBatches(blocks[i].SequencedBatches[element.Pos], blocks[i].BlockNumber, dbTx) + err = s.processSequenceBatches(blocks[i].SequencedBatches[element.Pos], dbTx) if err != nil { return err } @@ -599,23 +605,60 @@ func (s *ClientSynchronizer) checkTrustedState(batch state.Batch, tBatch *state. return false } -func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman.SequencedBatch, blockNumber uint64, dbTx pgx.Tx) error { +func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman.SequencedBatch, dbTx pgx.Tx) error { if len(sequencedBatches) == 0 { log.Warn("Empty sequencedBatches array detected, ignoring...") return nil } - prevTimestamp, err := s.state.GetLastBatchTime(s.ctx, dbTx) - if err != nil { - log.Warn("Error fetching previous timestamp") - return err - } for _, sbatch := range sequencedBatches { + // Ensure the L1 origin for this batch is in the database. Since the L1 origin assigned by + // HotShot is not necessarily the same as an L1 block which we added to the database as a + // result of receiving a contract event, it might not be. + blockNumber := sbatch.BlockNumber + exists, err := s.state.ContainsBlock(s.ctx, blockNumber, dbTx) + if err != nil { + log.Errorf("error fetching L1 block %d from db: %v", blockNumber, err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Fatalf("error rolling back state. BlockNumber: %d, rollbackErr: %s, error: %w", blockNumber, rollbackErr.Error(), err) + } + return err + } + if !exists { + header, err := s.etherMan.HeaderByNumber(s.ctx, big.NewInt(int64(blockNumber))) + if err != nil { + log.Errorf("error fetching L1 block %d: %v", blockNumber, err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Fatalf("error rolling back state. BlockNumber: %d, rollbackErr: %s, error: %w", blockNumber, rollbackErr.Error(), err) + } + return err + } + b := state.Block{ + BlockNumber: blockNumber, + BlockHash: header.Hash(), + ParentHash: header.ParentHash, + ReceivedAt: time.Unix(int64(header.Time), 0), + } + log.Infof("L1 block %d does not already exist, adding to database", blockNumber) + err = s.state.AddBlock(s.ctx, &b, dbTx) + if err != nil { + log.Errorf("error storing block. BlockNumber: %d, error: %w", blockNumber, err) + rollbackErr := dbTx.Rollback(s.ctx) + if rollbackErr != nil { + log.Fatalf("error rolling back state to store block. BlockNumber: %d, rollbackErr: %s, error : %w", blockNumber, rollbackErr.Error(), err) + } + return err + } + } + + virtualBatch := state.VirtualBatch{ BatchNumber: sbatch.BatchNumber, TxHash: sbatch.TxHash, Coinbase: sbatch.Coinbase, - BlockNumber: blockNumber, + BlockNumber: sbatch.BlockNumber, SequencerAddr: sbatch.SequencerAddr, } batch := state.Batch{ @@ -626,16 +669,6 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. BatchL2Data: sbatch.Transactions, } - // This should not be necessary, since HotShot should enforce non-decreasing timestamps. - // However, since HotShot does not currently support the ValidatedState API, timestamps - // proposed by leaders are not checked by replicas, and may occasionally decrease. In this - // case, just use the previous timestamp, to avoid breaking the rest of the execution - // pipeline. - if batch.Timestamp.Before(prevTimestamp) { - batch.Timestamp = prevTimestamp - } - prevTimestamp = batch.Timestamp - // Forced batches no longer supported, don't need to be handled // Now we need to check the batch. ForcedBatches should be already stored in the batch table because this is done by the sequencer @@ -719,15 +752,15 @@ func (s *ClientSynchronizer) processSequenceBatches(sequencedBatches []etherman. FromBatchNumber: sequencedBatches[0].BatchNumber, ToBatchNumber: sequencedBatches[len(sequencedBatches)-1].BatchNumber, } - err = s.state.AddSequence(s.ctx, seq, dbTx) + err := s.state.AddSequence(s.ctx, seq, dbTx) if err != nil { log.Errorf("error adding sequence. Sequence: %+v", seq) rollbackErr := dbTx.Rollback(s.ctx) if rollbackErr != nil { - log.Errorf("error rolling back state. BlockNumber: %d, rollbackErr: %s, error : %w", blockNumber, rollbackErr.Error(), err) + log.Errorf("error rolling back state. rollbackErr: %s, error : %w", rollbackErr.Error(), err) return rollbackErr } - log.Errorf("error getting adding sequence. BlockNumber: %d, error: %w", blockNumber, err) + log.Errorf("error getting adding sequence. error: %w", err) return err } return nil