Skip to content

Commit

Permalink
allSnapshots should not panic regardlessly (#13355)
Browse files Browse the repository at this point in the history
Closes #13295
awskii authored Jan 21, 2025
1 parent eca5fe3 commit 8ea3fec
Showing 5 changed files with 123 additions and 34 deletions.
30 changes: 26 additions & 4 deletions cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
@@ -55,7 +55,12 @@ var cmdResetState = &cobra.Command{
}
ctx, _ := common.RootContext()
defer db.Close()
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
logger.Error("Opening snapshots", "error", err)
return
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -138,11 +143,28 @@ func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *heimdall.
}
fmt.Fprintf(w, "--\n")
fmt.Fprintf(w, "prune distance: %s\n\n", pm.String())
fmt.Fprintf(w, "blocks: segments=%d, indices=%d\n", snapshots.SegmentsMax(), snapshots.IndicesMax())
fmt.Fprintf(w, "blocks.bor: segments=%d, indices=%d\n\n", borSn.SegmentsMax(), borSn.IndicesMax())
if snapshots != nil {
fmt.Fprintf(w, "blocks: segments=%d, indices=%d\n", snapshots.SegmentsMax(), snapshots.IndicesMax())
} else {
fmt.Fprintf(w, "blocks: segments=0, indices=0; failed to open snapshots\n")
}
if borSn != nil {
fmt.Fprintf(w, "blocks.bor: segments=%d, indices=%d\n", borSn.SegmentsMax(), borSn.IndicesMax())
} else {
fmt.Fprintf(w, "blocks.bor: segments=0, indices=0; failed to open bor snapshots\n")
}

_lb, _lt, _ := rawdbv3.TxNums.Last(tx)
fmt.Fprintf(w, "state.history: idx steps: %.02f, TxNums_Index(%d,%d), filesAmount: %d\n\n", rawdbhelpers.IdxStepsCountV3(tx), _lb, _lt, agg.FilesAmount())

var filesAmount []int
var aggIsNotOkMessage string
if agg != nil {
filesAmount = agg.FilesAmount()
} else {
aggIsNotOkMessage = "failed to open aggregator snapshots"
}

fmt.Fprintf(w, "state.history: idx steps: %.02f, TxNums_Index(%d,%d), filesAmount: %d%s\n\n", rawdbhelpers.IdxStepsCountV3(tx), _lb, _lt, filesAmount, aggIsNotOkMessage)
ethTxSequence, err := tx.ReadSequence(kv.EthTx)
if err != nil {
return err
5 changes: 4 additions & 1 deletion cmd/integration/commands/root.go
Original file line number Diff line number Diff line change
@@ -109,6 +109,9 @@ func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (tdb kv.
}
}

_, _, agg, _, _, _ := allSnapshots(context.Background(), rawDB, logger)
_, _, agg, _, _, _, err := allSnapshots(context.Background(), rawDB, logger)
if err != nil {
return nil, err
}
return temporal.New(rawDB, agg)
}
107 changes: 81 additions & 26 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
@@ -639,7 +639,11 @@ func init() {
}

func stageSnapshots(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -694,7 +698,11 @@ func stageHeaders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
return err
}

sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -793,7 +801,10 @@ func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []str
return nil
}
if unwind > 0 {
sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, bridgeStore, heimdallStore, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -824,7 +835,10 @@ func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []str
return nil
}

sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, bridgeStore, heimdallStore, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -856,7 +870,10 @@ func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []str
}

func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -896,7 +913,11 @@ func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) err
func stagePolygonSync(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
engine, _, stageSync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
heimdallClient := engine.(*bor.Bor).HeimdallClient
sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, bridgeStore, heimdallStore, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -927,7 +948,11 @@ func stagePolygonSync(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
tmpdir := datadir.New(datadirCli).Tmp
chainConfig := fromdb.ChainConfig(db)
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -1019,7 +1044,11 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error

engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.Execution))
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}

defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -1176,7 +1205,10 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger

engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
must(sync.SetCurrentStage(stages.Execution))
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -1201,7 +1233,7 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
genesis := core.GenesisBlockByChainName(chain)
br, _ := blocksIO(db, logger)
cfg := stagedsync.StageCustomTraceCfg(db, pm, dirs, br, chainConfig, engine, genesis, &syncCfg)
err := stagedsync.SpawnCustomTrace(cfg, ctx, logger)
err = stagedsync.SpawnCustomTrace(cfg, ctx, logger)
if err != nil {
return err
}
@@ -1212,7 +1244,10 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
func stagePatriciaTrie(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
_ = pm
sn, _, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, _, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer agg.Close()
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
@@ -1236,7 +1271,10 @@ func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) e
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
chainConfig := fromdb.ChainConfig(db)
must(sync.SetCurrentStage(stages.TxLookup))
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -1283,7 +1321,7 @@ func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) e
}

func printAllStages(db kv.RoDB, ctx context.Context, logger log.Logger) error {
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, _ := allSnapshots(ctx, db, logger) // ignore error here to get some stat.
defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -1322,7 +1360,8 @@ var _aggSingleton *libstate.Aggregator
var _bridgeStoreSingleton bridge.Store
var _heimdallStoreSingleton heimdall.Store

func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *heimdall.RoSnapshots, *libstate.Aggregator, *freezeblocks.CaplinSnapshots, bridge.Store, heimdall.Store) {
func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezeblocks.RoSnapshots, *heimdall.RoSnapshots, *libstate.Aggregator, *freezeblocks.CaplinSnapshots, bridge.Store, heimdall.Store, error) {
var err error

openSnapshotOnce.Do(func() {
dirs := datadir.New(datadirCli)
@@ -1334,13 +1373,13 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
_allBorSnapshotsSingleton = heimdall.NewRoSnapshots(snapCfg, dirs.Snap, 0, logger)
_bridgeStoreSingleton = bridge.NewSnapshotStore(bridge.NewDbStore(db), _allBorSnapshotsSingleton, chainConfig.Bor)
_heimdallStoreSingleton = heimdall.NewSnapshotStore(heimdall.NewDbStore(db), _allBorSnapshotsSingleton)
var err error
blockReader := freezeblocks.NewBlockReader(_allSnapshotsSingleton, _allBorSnapshotsSingleton, _heimdallStoreSingleton, _bridgeStoreSingleton)

txNums := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader))

_aggSingleton, err = libstate.NewAggregator2(ctx, dirs, config3.DefaultStepSize, db, logger)
if err != nil {
panic(err)
err = fmt.Errorf("aggregator init: %w", err)
return
}

_aggSingleton.SetProduceMod(snapCfg.ProduceE3)
@@ -1354,15 +1393,21 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
_allBorSnapshotsSingleton.OptimisticalyOpenFolder()
return nil
})
g.Go(func() error { return _aggSingleton.OpenFolder() })
g.Go(func() error {
err := _aggSingleton.OpenFolder()
if err != nil {
return fmt.Errorf("aggregator opening: %w", err)
}
return nil
})
g.Go(func() error {
chainConfig := fromdb.ChainConfig(db)
var beaconConfig *clparams.BeaconChainConfig
_, beaconConfig, _, err = clparams.GetConfigsByNetworkName(chainConfig.ChainName)
if err == nil {
_allCaplinSnapshotsSingleton = freezeblocks.NewCaplinSnapshots(snapCfg, beaconConfig, dirs, logger)
if err = _allCaplinSnapshotsSingleton.OpenFolder(); err != nil {
return err
return fmt.Errorf("caplin snapshots: %w", err)
}
_allCaplinSnapshotsSingleton.LogStat("caplin")
}
@@ -1378,9 +1423,8 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
logger.Info("[downloads]", "locked", er == nil, "at", mtime.Format("02 Jan 06 15:04 2006"))
return nil
})
err = g.Wait()
if err != nil {
panic(err)
if err = g.Wait(); err != nil {
return
}

_allSnapshotsSingleton.LogStat("blocks")
@@ -1390,12 +1434,17 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
defer ac.Close()
ac.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) {
_, histBlockNumProgress, err := txNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress, err
return histBlockNumProgress, fmt.Errorf("findBlockNum(%d) fails: %w", endTxNumMinimax, err)
})
return nil
})
})
return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _aggSingleton, _allCaplinSnapshotsSingleton, _bridgeStoreSingleton, _heimdallStoreSingleton

if err != nil {
log.Error("[snapshots] failed to open", "err", err)
return nil, nil, nil, nil, nil, nil, err
}
return _allSnapshotsSingleton, _allBorSnapshotsSingleton, _aggSingleton, _allCaplinSnapshotsSingleton, _bridgeStoreSingleton, _heimdallStoreSingleton, nil
}

var openBlockReaderOnce sync.Once
@@ -1404,7 +1453,10 @@ var _blockWriterSingleton *blockio.BlockWriter

func blocksIO(db kv.RoDB, logger log.Logger) (services.FullBlockReader, *blockio.BlockWriter) {
openBlockReaderOnce.Do(func() {
sn, borSn, _, _, bridgeStore, heimdallStore := allSnapshots(context.Background(), db, logger)
sn, borSn, _, _, bridgeStore, heimdallStore, err := allSnapshots(context.Background(), db, logger)
if err != nil {
panic(err)
}
_blockReaderSingleton = freezeblocks.NewBlockReader(sn, borSn, heimdallStore, bridgeStore)
_blockWriterSingleton = blockio.NewBlockWriter()
})
@@ -1446,7 +1498,10 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.Minin
cfg.Miner = *miningConfig
}
cfg.Dirs = datadir.New(datadirCli)
allSn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
allSn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
panic(err) // we do already panic above on genesis error
}
cfg.Snapshot = allSn.Cfg()

blockReader, blockWriter := blocksIO(db, logger)
5 changes: 4 additions & 1 deletion cmd/integration/commands/state_domains.go
Original file line number Diff line number Diff line change
@@ -457,7 +457,10 @@ func makePurifiedDomains(db kv.RwDB, dirs datadir.Dirs, logger log.Logger, domai
}

func requestDomains(chainDb, stateDb kv.RwDB, ctx context.Context, readDomain string, addrs [][]byte, logger log.Logger) error {
sn, bsn, agg, _, _, _ := allSnapshots(ctx, chainDb, logger)
sn, bsn, agg, _, _, _, err := allSnapshots(ctx, chainDb, logger)
if err != nil {
return err
}
defer sn.Close()
defer bsn.Close()
defer agg.Close()
10 changes: 8 additions & 2 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
@@ -156,7 +156,10 @@ func syncBySmallSteps(db kv.TemporalRwDB, miningConfig params.MiningConfig, ctx
return err
}

sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger1)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger1)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()
@@ -387,7 +390,10 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *chain2.Config) {
func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
chainConfig := fromdb.ChainConfig(db)
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _, err := allSnapshots(ctx, db, logger)
if err != nil {
return err
}
defer sn.Close()
defer borSn.Close()
defer agg.Close()

0 comments on commit 8ea3fec

Please sign in to comment.