Skip to content

Commit

Permalink
Additional fix for externalCL integration (prevent MDBX_MAP_FULL) (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexeyAkhunov authored Nov 30, 2024
1 parent ba101ac commit baf0a60
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 85 deletions.
2 changes: 1 addition & 1 deletion erigon-lib/kv/mdbx/kv_mdbx_temporary.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewTemporaryMdbx(ctx context.Context, tempdir string) (kv.RwDB, error) {
return &TemporaryMdbx{}, err
}

db, err := New(kv.ChainDB, log.Root()).InMem(path).GrowthStep(64 * datasize.MB).Open(ctx)
db, err := New(kv.ChainDB, log.Root()).InMem(path).Open(ctx)
if err != nil {
return &TemporaryMdbx{}, err
}
Expand Down
80 changes: 8 additions & 72 deletions turbo/engineapi/engine_block_downloader/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package engine_block_downloader

import (
"bytes"
"context"
"encoding/binary"
"fmt"
Expand All @@ -35,7 +34,6 @@ import (
"github.com/erigontech/erigon-lib/etl"
execution "github.com/erigontech/erigon-lib/gointerfaces/executionproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/dbutils"

"github.com/erigontech/erigon-lib/rlp"
"github.com/erigontech/erigon/core/rawdb"
Expand Down Expand Up @@ -164,10 +162,11 @@ func (e *EngineBlockDownloader) waitForEndOfHeadersDownload(ctx context.Context)
}

// waitForEndOfHeadersDownload waits until the download of headers ends and returns the outcome.
func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, fromHash libcommon.Hash, err error) {
func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uint64, toBlock uint64, err error) {
var lastValidHash libcommon.Hash
var badChainError error // TODO(yperbasis): this is not set anywhere
var foundPow bool
var found bool

headerLoadFunc := func(key, value []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
var h types.Header
Expand All @@ -185,8 +184,8 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin
lastValidHash = h.ParentHash
// If we are in PoW range then block validation is not required anymore.
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
if !found {
found = true
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
Expand All @@ -195,15 +194,15 @@ func (e *EngineBlockDownloader) loadDownloadedHeaders(tx kv.RwTx) (fromBlock uin

foundPow = h.Difficulty.Sign() != 0
if foundPow {
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
if !found {
found = true
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
return saveHeader(tx, &h, h.Hash())
}
if (fromHash == libcommon.Hash{}) {
fromHash = h.Hash()
if !found {
found = true
fromBlock = h.Number.Uint64()
}
toBlock = h.Number.Uint64()
Expand Down Expand Up @@ -238,66 +237,3 @@ func saveHeader(db kv.RwTx, header *types.Header, hash libcommon.Hash) error {
}
return nil
}

func (e *EngineBlockDownloader) insertHeadersAndBodies(ctx context.Context, tx kv.Tx, fromBlock uint64, fromHash libcommon.Hash, toBlock uint64) error {
blockBatchSize := 500
blockWrittenLogSize := 20_000
// We divide them in batches
blocksBatch := []*types.Block{}

headersCursors, err := tx.Cursor(kv.Headers)
if err != nil {
return err
}
inserted := uint64(0)

log.Info("Beginning downloaded blocks insertion")
// Start by seeking headers
for k, v, err := headersCursors.Seek(dbutils.HeaderKey(fromBlock, fromHash)); k != nil; k, v, err = headersCursors.Next() {
if err != nil {
return err
}
if len(blocksBatch) == blockBatchSize {
if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil {
return err
}
currentHeader := e.chainRW.CurrentHeader(ctx)
lastBlockNumber := blocksBatch[len(blocksBatch)-1].NumberU64()
isForkChoiceNeeded := currentHeader == nil || lastBlockNumber > currentHeader.Number.Uint64()
inserted += uint64(len(blocksBatch))
if inserted >= uint64(e.syncCfg.LoopBlockLimit) {
lastHash := blocksBatch[len(blocksBatch)-1].Hash()
if isForkChoiceNeeded {
if _, _, _, err := e.chainRW.UpdateForkChoice(ctx, lastHash, lastHash, lastHash); err != nil {
return err
}
}
inserted = 0
}
blocksBatch = blocksBatch[:0]
}
header := new(types.Header)
if err := rlp.Decode(bytes.NewReader(v), header); err != nil {
e.logger.Error("Invalid block header RLP", "err", err)
return nil
}
number := header.Number.Uint64()
if number > toBlock {
return e.chainRW.InsertBlocksAndWait(ctx, blocksBatch)
}
hash := header.Hash()
body, err := rawdb.ReadBodyWithTransactions(tx, hash, number)
if err != nil {
return err
}
if body == nil {
return fmt.Errorf("missing body at block=%d", number)
}
blocksBatch = append(blocksBatch, types.NewBlockFromStorage(hash, header, body.Transactions, body.Uncles, body.Withdrawals))
if number%uint64(blockWrittenLogSize) == 0 {
e.logger.Info("[insertHeadersAndBodies] Written blocks", "progress", number, "to", toBlock)
}
}
return e.chainRW.InsertBlocksAndWait(ctx, blocksBatch)

}
27 changes: 21 additions & 6 deletions turbo/engineapi/engine_block_downloader/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/common/dbg"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon/core/rawdb"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/dataflow"
"github.com/erigontech/erigon/eth/stagedsync/stages"
"github.com/erigontech/erigon/turbo/stages/bodydownload"
Expand Down Expand Up @@ -80,6 +80,9 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co
prevProgress := bodyProgress
var noProgressCount uint = 0 // How many time the progress was printed without actual progress
var totalDelivered uint64 = 0
blockBatchSize := 500
// We divide them in batches
blocksBatch := []*types.Block{}

loopBody := func() (bool, error) {
// loopCount is used here to ensure we don't get caught in a constant loop of making requests
Expand Down Expand Up @@ -146,14 +149,20 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co
return false, fmt.Errorf("[%s] Header block unexpected when matching body, got %v, expected %v", logPrefix, blockHeight, nextBlock)
}

if len(blocksBatch) == blockBatchSize {
if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil {
return false, fmt.Errorf("InsertBlock: %w", err)
}
blocksBatch = blocksBatch[:0]
}
// Check existence before write - because WriteRawBody isn't idempotent (it allocates new sequence range for transactions on every call)
ok, err := rawdb.WriteRawBodyIfNotExists(tx, header.Hash(), blockHeight, rawBody)
rawBlock := types.RawBlock{Header: header, Body: rawBody}
block, err := rawBlock.AsBlock()
if err != nil {
return false, fmt.Errorf("WriteRawBodyIfNotExists: %w", err)
}
if ok {
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)
return false, fmt.Errorf("Could not construct block: %w", err)
}
blocksBatch = append(blocksBatch, block)
dataflow.BlockBodyDownloadStates.AddChange(blockHeight, dataflow.BlockBodyCleared)

if blockHeight > bodyProgress {
bodyProgress = blockHeight
Expand Down Expand Up @@ -202,6 +211,12 @@ func (e *EngineBlockDownloader) downloadAndLoadBodiesSyncronously(ctx context.Co
return err
}
}
if len(blocksBatch) > 0 {
if err := e.chainRW.InsertBlocksAndWait(ctx, blocksBatch); err != nil {
return fmt.Errorf("InsertBlock: %w", err)
}
blocksBatch = blocksBatch[:0]
}

if stopped {
return libcommon.ErrStopped
Expand Down
7 changes: 1 addition & 6 deletions turbo/engineapi/engine_block_downloader/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (e *EngineBlockDownloader) download(ctx context.Context, hashToDownload lib
return
}
}
startBlock, endBlock, startHash, err := e.loadDownloadedHeaders(memoryMutation)
startBlock, endBlock, err := e.loadDownloadedHeaders(memoryMutation)
if err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not load headers", "err", err)
e.status.Store(headerdownload.Idle)
Expand All @@ -102,11 +102,6 @@ func (e *EngineBlockDownloader) download(ctx context.Context, hashToDownload lib
return
}
tx.Rollback() // Discard the original db tx
if err := e.insertHeadersAndBodies(ctx, tmpTx, startBlock, startHash, endBlock); err != nil {
e.logger.Warn("[EngineBlockDownloader] Could not insert headers and bodies", "err", err)
e.status.Store(headerdownload.Idle)
return
}
e.logger.Info("[EngineBlockDownloader] Finished downloading blocks", "from", startBlock-1, "to", endBlock)
if block == nil {
e.status.Store(headerdownload.Idle)
Expand Down

0 comments on commit baf0a60

Please sign in to comment.