From baf0a609131bd09f9a5f138be025c5ccd2ca1485 Mon Sep 17 00:00:00 2001 From: ledgerwatch Date: Sat, 30 Nov 2024 13:52:27 +0000 Subject: [PATCH] Additional fix for externalCL integration (prevent MDBX_MAP_FULL) (#12922) Fixes https://github.com/erigontech/erigon/issues/12225 --- erigon-lib/kv/mdbx/kv_mdbx_temporary.go | 2 +- .../block_downloader.go | 80 ++----------------- .../engineapi/engine_block_downloader/body.go | 27 +++++-- .../engineapi/engine_block_downloader/core.go | 7 +- 4 files changed, 31 insertions(+), 85 deletions(-) diff --git a/erigon-lib/kv/mdbx/kv_mdbx_temporary.go b/erigon-lib/kv/mdbx/kv_mdbx_temporary.go index 879a2e95fe4..aa4e87f5f16 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx_temporary.go +++ b/erigon-lib/kv/mdbx/kv_mdbx_temporary.go @@ -37,7 +37,7 @@ func NewTemporaryMdbx(ctx context.Context, tempdir string) (kv.RwDB, error) { return &TemporaryMdbx{}, err } - db, err := New(kv.ChainDB, log.Root()).InMem(path).GrowthStep(64 * datasize.MB).Open(ctx) + db, err := New(kv.ChainDB, log.Root()).InMem(path).Open(ctx) if err != nil { return &TemporaryMdbx{}, err } diff --git a/turbo/engineapi/engine_block_downloader/block_downloader.go b/turbo/engineapi/engine_block_downloader/block_downloader.go index 60d64fd0e0a..a516896d265 100644 --- a/turbo/engineapi/engine_block_downloader/block_downloader.go +++ b/turbo/engineapi/engine_block_downloader/block_downloader.go @@ -17,7 +17,6 @@ package engine_block_downloader import ( - "bytes" "context" "encoding/binary" "fmt" @@ -35,7 +34,6 @@ import ( "github.com/erigontech/erigon-lib/etl" execution "github.com/erigontech/erigon-lib/gointerfaces/executionproto" "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/kv/dbutils" "github.com/erigontech/erigon-lib/rlp" "github.com/erigontech/erigon/core/rawdb" @@ -164,10 +162,11 @@ func (e *EngineBlockDownloader) waitForEndOfHeadersDownload(ctx context.Context) } // waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome. -func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) { +func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, err error) { var lastValidHash libcommon.Hash var badChainError error // TODO(yperbasis): this is not set anywhere var foundPow bool + var found bool headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error { var h types.Header @@ -185,8 +184,8 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin lastValidHash = h.ParentHash // If we are in PoW range then block validation is not required anymore. if foundPow { - if (fromHash == libcommon.Hash{}) { - fromHash = h.Hash() + if !found { + found = true fromBlock = h.Number.Uint64() } toBlock = h.Number.Uint64() @@ -195,15 +194,15 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin foundPow = h.Difficulty.Sign() != 0 if foundPow { - if (fromHash == libcommon.Hash{}) { - fromHash = h.Hash() + if !found { + found = true fromBlock = h.Number.Uint64() } toBlock = h.Number.Uint64() return saveHeader(tx, &h, h.Hash()) } - if (fromHash == libcommon.Hash{}) { - fromHash = h.Hash() + if !found { + found = true fromBlock = h.Number.Uint64() } toBlock = h.Number.Uint64() @@ -238,66 +237,3 @@ func saveHeader(db kv.RwTx, header *types.Header, hash libcommon.Hash) error { } return nil } - -func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx kv.Tx, fromBlock uint64, fromHash libcommon.Hash, toBlock uint64) error { - blockBatchSize := 500 - blockWrittenLogSize := 20_000 - // We divide them in batches - blocksBatch := []*types.Block{} - - headersCursors, err := tx.Cursor(kv.Headers) - if err != nil { - return err - } - inserted := uint64(0) - - log.Info("Beginning downloaded blocks insertion") - // Start by seeking headers - for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() { - if err != nil { - return err - } - if len(blocksBatch) == blockBatchSize { - if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil { - return err - } - currentHeader := e.chainRW.CurrentHeader(ctx) - lastBlockNumber := blocksBatch[len(blocksBatch)-1].NumberU64() - isForkChoiceNeeded := currentHeader == nil || lastBlockNumber > currentHeader.Number.Uint64() - inserted += uint64(len(blocksBatch)) - if inserted >= uint64(e.syncCfg.LoopBlockLimit) { - lastHash := blocksBatch[len(blocksBatch)-1].Hash() - if isForkChoiceNeeded { - if _, _, _, err := e.chainRW.UpdateForkChoice(ctx, lastHash, lastHash, lastHash); err != nil { - return err - } - } - inserted = 0 - } - blocksBatch = blocksBatch[:0] - } - header := new(types.Header) - if err := rlp.Decode(bytes.NewReader(v), header); err != nil { - e.logger.Error("Invalid block header RLP", "err", err) - return nil - } - number := header.Number.Uint64() - if number > toBlock { - return e.chainRW.InsertBlocksAndWait(ctx, blocksBatch) - } - hash := header.Hash() - body, err := rawdb.ReadBodyWithTransactions(tx, hash, number) - if err != nil { - return err - } - if body == nil { - return fmt.Errorf("missing body at block=%d", number) - } - blocksBatch = append(blocksBatch, types.NewBlockFromStorage(hash, header, body.Transactions, body.Uncles, body.Withdrawals)) - if number%uint64(blockWrittenLogSize) == 0 { - e.logger.Info("[insertHeadersAndBodies] Written blocks", "progress", number, "to", toBlock) - } - } - return e.chainRW.InsertBlocksAndWait(ctx, blocksBatch) - -} diff --git a/turbo/engineapi/engine_block_downloader/body.go b/turbo/engineapi/engine_block_downloader/body.go index 5d440ff680f..bdb640a385f 100644 --- a/turbo/engineapi/engine_block_downloader/body.go +++ b/turbo/engineapi/engine_block_downloader/body.go @@ -27,7 +27,7 @@ import ( libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/dbg" "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon/core/rawdb" + "github.com/erigontech/erigon/core/types" "github.com/erigontech/erigon/dataflow" "github.com/erigontech/erigon/eth/stagedsync/stages" "github.com/erigontech/erigon/turbo/stages/bodydownload" @@ -80,6 +80,9 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co prevProgress := bodyProgress var noProgressCount uint = 0 // How many time the progress was printed without actual progress var totalDelivered uint64 = 0 + blockBatchSize := 500 + // We divide them in batches + blocksBatch := []*types.Block{} loopBody := func() (bool, error) { // loopCount is used here to ensure we don't get caught in a constant loop of making requests @@ -146,14 +149,20 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock) } + if len(blocksBatch) == blockBatchSize { + if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil { + return false, fmt.Errorf("InsertBlock: %w", err) + } + blocksBatch = blocksBatch[:0] + } // Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call) - ok, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody) + rawBlock := types.RawBlock{Header: header, Body: rawBody} + block, err := rawBlock.AsBlock() if err != nil { - return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err) - } - if ok { - dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared) + return false, fmt.Errorf("Could not construct block: %w", err) } + blocksBatch = append(blocksBatch, block) + dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared) if blockHeight > bodyProgress { bodyProgress = blockHeight @@ -202,6 +211,12 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co return err } } + if len(blocksBatch) > 0 { + if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil { + return fmt.Errorf("InsertBlock: %w", err) + } + blocksBatch = blocksBatch[:0] + } if stopped { return libcommon.ErrStopped diff --git a/turbo/engineapi/engine_block_downloader/core.go b/turbo/engineapi/engine_block_downloader/core.go index adf9f61d7a3..03e7fa47a47 100644 --- a/turbo/engineapi/engine_block_downloader/core.go +++ b/turbo/engineapi/engine_block_downloader/core.go @@ -88,7 +88,7 @@ func (e *EngineBlockDownloader) download(ctx context.Context, hashToDownload lib return } } - startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation) + startBlock, endBlock, err := e.loadDownloadedHeaders(memoryMutation) if err != nil { e.logger.Warn("[EngineBlockDownloader] Could not load headers", "err", err) e.status.Store(headerdownload.Idle) @@ -102,11 +102,6 @@ func (e *EngineBlockDownloader) download(ctx context.Context, hashToDownload lib return } tx.Rollback() // Discard the original db tx - if err := e.insertHeadersAndBodies(ctx, tmpTx, startBlock, startHash, endBlock); err != nil { - e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies", "err", err) - e.status.Store(headerdownload.Idle) - return - } e.logger.Info("[EngineBlockDownloader] Finished downloading blocks", "from", startBlock-1, "to", endBlock) if block == nil { e.status.Store(headerdownload.Idle)