From 5df524cbc69855432b156720b9e8569632e774b4 Mon Sep 17 00:00:00 2001 From: scooletz Date: Fri, 24 Jan 2025 17:05:26 +0100 Subject: [PATCH] FastHeadersSyncFeed explicitly flush before setting metadata --- .../Nethermind.Blockchain/BlockTree.cs | 16 +++++++++ .../Nethermind.Blockchain/BlockTreeOverlay.cs | 6 ++-- .../Blocks/BlockStore.cs | 2 ++ .../Blocks/IBlockStore.cs | 1 + .../Headers/HeaderStore.cs | 6 ++++ .../Headers/IHeaderStore.cs | 5 +++ .../Nethermind.Blockchain/IBlockTree.cs | 36 ++++++++++++++++--- .../ReadOnlyBlockTree.cs | 2 ++ src/Nethermind/Nethermind.Era1/EraImporter.cs | 2 +- .../Simulate/SimulateDictionaryBlockStore.cs | 4 +++ .../Simulate/SimulateDictionaryHeaderStore.cs | 4 +++ .../Repositories/ChainLevelInfoRepository.cs | 2 ++ .../Repositories/IChainLevelInfoRepository.cs | 1 + .../FastBlocks/BodiesSyncFeedTests.cs | 1 - .../FastBlocks/BodiesSyncFeed.cs | 12 +++---- .../FastBlocks/FastHeadersSyncFeed.cs | 3 +- .../ParallelSync/ActivatedSyncFeed.cs | 1 - 17 files changed, 85 insertions(+), 19 deletions(-) diff --git a/src/Nethermind/Nethermind.Blockchain/BlockTree.cs b/src/Nethermind/Nethermind.Blockchain/BlockTree.cs index acb74922fdb..8aafe6f46c7 100644 --- a/src/Nethermind/Nethermind.Blockchain/BlockTree.cs +++ b/src/Nethermind/Nethermind.Blockchain/BlockTree.cs @@ -273,6 +273,22 @@ public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions he return AddBlockResult.Added; } + public void Flush(FlushReason reason) + { + switch (reason) + { + case FlushReason.InsertHeaders: + _headerStore.Flush(); + _chainLevelInfoRepository.Flush(); + break; + case FlushReason.InsertBlocks: + _blockStore.Flush(); + break; + default: + throw new ArgumentOutOfRangeException(nameof(reason), reason, null); + } + } + public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None) { diff --git a/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs b/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs index 6df42379400..c4b2585b3ee 100644 --- a/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs +++ b/src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs @@ -58,8 +58,10 @@ public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions he public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, - WriteFlags bodiesWriteFlags = WriteFlags.None) => - _overlayTree.Insert(block, insertBlockOptions, insertHeaderOptions, bodiesWriteFlags); + WriteFlags blockWriteFlags = WriteFlags.None) => + _overlayTree.Insert(block, insertBlockOptions, insertHeaderOptions, blockWriteFlags); + + public void Flush(FlushReason reason) => _overlayTree.Flush(reason); public void UpdateHeadBlock(Hash256 blockHash) => _overlayTree.UpdateHeadBlock(blockHash); diff --git a/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs b/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs index 3cb168ba391..416a99d561d 100644 --- a/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs +++ b/src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs @@ -96,4 +96,6 @@ public void Cache(Block block) { _blockCache.Set(block.Hash, block); } + + public void Flush() => blockDb.Flush(); } diff --git a/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs b/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs index eca635bc9d2..ee0c2a5eed2 100644 --- a/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs +++ b/src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs @@ -20,4 +20,5 @@ public interface IBlockStore ReceiptRecoveryBlock? GetReceiptRecoveryBlock(long blockNumber, Hash256 blockHash); void Cache(Block block); bool HasBlock(long blockNumber, Hash256 blockHash); + void Flush(); } diff --git a/src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs b/src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs index c8d6044e7db..e2636d7718b 100644 --- a/src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs +++ b/src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs @@ -80,6 +80,12 @@ public void InsertBlockNumber(Hash256 blockHash, long blockNumber) return Get(blockHash)?.Number; } + public void Flush() + { + _headerDb.Flush(); + _blockNumberDb.Flush(); + } + private long? GetBlockNumberFromBlockNumberDb(Hash256 blockHash) { Span numberSpan = _blockNumberDb.GetSpan(blockHash); diff --git a/src/Nethermind/Nethermind.Blockchain/Headers/IHeaderStore.cs b/src/Nethermind/Nethermind.Blockchain/Headers/IHeaderStore.cs index 4b1b2bc9025..8730d528c97 100644 --- a/src/Nethermind/Nethermind.Blockchain/Headers/IHeaderStore.cs +++ b/src/Nethermind/Nethermind.Blockchain/Headers/IHeaderStore.cs @@ -14,4 +14,9 @@ public interface IHeaderStore void Delete(Hash256 blockHash); void InsertBlockNumber(Hash256 blockHash, long blockNumber); long? GetBlockNumber(Hash256 blockHash); + + /// + /// Flushed the underlying db. + /// + void Flush(); } diff --git a/src/Nethermind/Nethermind.Blockchain/IBlockTree.cs b/src/Nethermind/Nethermind.Blockchain/IBlockTree.cs index 79f19ed581c..8800a66be9f 100644 --- a/src/Nethermind/Nethermind.Blockchain/IBlockTree.cs +++ b/src/Nethermind/Nethermind.Blockchain/IBlockTree.cs @@ -65,15 +65,25 @@ public interface IBlockTree : IBlockFinder /// Header to add /// /// Result of the operation, eg. Added, AlreadyKnown, etc. - AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None); + AddBlockResult Insert(BlockHeader header, + BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None); /// /// Inserts a disconnected block body (not for processing). /// /// Block to add + /// The write flags overrides to be used for this insert operation. /// Result of the operation, eg. Added, AlreadyKnown, etc. - AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, - BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags bodiesWriteFlags = WriteFlags.None); + AddBlockResult Insert(Block block, + BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, + BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, + WriteFlags blockWriteFlags = WriteFlags.None); + + /// + /// Flushes underlying storages for the specific . + /// + /// The reason for flushing, showing what changes should be persisted. + void Flush(FlushReason reason); void UpdateHeadBlock(Hash256 blockHash); @@ -83,7 +93,8 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption /// Block to be included /// Options for suggesting block, whether a block should be processed or just added to the store. /// Result of the operation, eg. Added, AlreadyKnown, etc. - AddBlockResult SuggestBlock(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess); + AddBlockResult SuggestBlock(Block block, + BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess); /// /// Suggests block for inclusion in the block tree. Wait for DB unlock if needed. @@ -91,7 +102,8 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption /// Block to be included /// Options for suggesting block, whether a block should be processed or just added to the store. /// Result of the operation, eg. Added, AlreadyKnown, etc. - ValueTask SuggestBlockAsync(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess); + ValueTask SuggestBlockAsync(Block block, + BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess); /// /// Suggests a block header (without body) @@ -179,4 +191,18 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption void RecalculateTreeLevels(); } + + public enum FlushReason + { + /// + /// Flush after + /// is called + /// + InsertHeaders, + + /// + /// Flush after for a block is called. + /// + InsertBlocks, + } } diff --git a/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs b/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs index 90bfb20a6ac..afb3c319d78 100644 --- a/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs +++ b/src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs @@ -68,6 +68,8 @@ public async Task Accept(IBlockTreeVisitor blockTreeVisitor, CancellationToken c public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls"); + public void Flush(FlushReason reason) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Flush)} calls"); + public void Insert(IEnumerable blocks) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls"); public void UpdateHeadBlock(Hash256 blockHash) diff --git a/src/Nethermind/Nethermind.Era1/EraImporter.cs b/src/Nethermind/Nethermind.Era1/EraImporter.cs index ef9706cfeed..89e023a7b3e 100644 --- a/src/Nethermind/Nethermind.Era1/EraImporter.cs +++ b/src/Nethermind/Nethermind.Era1/EraImporter.cs @@ -203,7 +203,7 @@ async Task ImportBlock(long blockNumber) private void InsertBlockAndReceipts(Block b, TxReceipt[] r, long lastBlockNumber) { if (blockTree.FindBlock(b.Number) is null) - blockTree.Insert(b, BlockTreeInsertBlockOptions.SaveHeader | BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, bodiesWriteFlags: WriteFlags.DisableWAL); + blockTree.Insert(b, BlockTreeInsertBlockOptions.SaveHeader | BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, blockWriteFlags: WriteFlags.DisableWAL); if (!receiptStorage.HasBlock(b.Number, b.Hash!)) receiptStorage.Insert(b, r, true, writeFlags: WriteFlags.DisableWAL, lastBlockNumber: lastBlockNumber); } diff --git a/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs b/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs index c0c26e49869..de64507b422 100644 --- a/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs +++ b/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryBlockStore.cs @@ -75,4 +75,8 @@ public bool HasBlock(long blockNumber, Hash256 blockHash) { return _blockNumDict.ContainsKey(blockNumber); } + + public void Flush() + { + } } diff --git a/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryHeaderStore.cs b/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryHeaderStore.cs index 97f6141218d..7ed84a5140f 100644 --- a/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryHeaderStore.cs +++ b/src/Nethermind/Nethermind.Facade/Simulate/SimulateDictionaryHeaderStore.cs @@ -66,4 +66,8 @@ public void InsertBlockNumber(Hash256 blockHash, long blockNumber) { return _blockNumberDict.TryGetValue(blockHash, out var blockNumber) ? blockNumber : readonlyBaseHeaderStore.GetBlockNumber(blockHash); } + + public void Flush() + { + } } diff --git a/src/Nethermind/Nethermind.State/Repositories/ChainLevelInfoRepository.cs b/src/Nethermind/Nethermind.State/Repositories/ChainLevelInfoRepository.cs index 5e08b326536..0a7d9c86144 100644 --- a/src/Nethermind/Nethermind.State/Repositories/ChainLevelInfoRepository.cs +++ b/src/Nethermind/Nethermind.State/Repositories/ChainLevelInfoRepository.cs @@ -71,5 +71,7 @@ void LocalPersistLevel() public BatchWrite StartBatch() => new(_writeLock); public ChainLevelInfo? LoadLevel(long number) => _blockInfoDb.Get(number, Rlp.GetStreamDecoder(), _blockInfoCache); + + public void Flush() => _blockInfoDb.Flush(); } } diff --git a/src/Nethermind/Nethermind.State/Repositories/IChainLevelInfoRepository.cs b/src/Nethermind/Nethermind.State/Repositories/IChainLevelInfoRepository.cs index 38f6b5c7f3f..d69447caf6b 100644 --- a/src/Nethermind/Nethermind.State/Repositories/IChainLevelInfoRepository.cs +++ b/src/Nethermind/Nethermind.State/Repositories/IChainLevelInfoRepository.cs @@ -11,5 +11,6 @@ public interface IChainLevelInfoRepository void PersistLevel(long number, ChainLevelInfo level, BatchWrite? batch = null); BatchWrite StartBatch(); ChainLevelInfo? LoadLevel(long number); + void Flush(); } } diff --git a/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs b/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs index 42a73a1220f..15ddd86b516 100644 --- a/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs +++ b/src/Nethermind/Nethermind.Synchronization.Test/FastBlocks/BodiesSyncFeedTests.cs @@ -72,7 +72,6 @@ public void Setup() Substitute.For(), _syncConfig, new NullSyncReport(), - _blocksDb, _metadataDb, LimboLogs.Instance, flushDbInterval: 10 diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs index ebf493078fb..0bd826ee649 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/BodiesSyncFeed.cs @@ -6,15 +6,12 @@ using System.Threading.Tasks; using Autofac.Features.AttributeFilters; using Nethermind.Blockchain; -using Nethermind.Blockchain.Blocks; using Nethermind.Blockchain.Synchronization; using Nethermind.Consensus.Validators; using Nethermind.Core; -using Nethermind.Core.Extensions; using Nethermind.Core.Specs; using Nethermind.Db; using Nethermind.Logging; -using Nethermind.Serialization.Rlp; using Nethermind.Stats.Model; using Nethermind.Synchronization.ParallelSync; using Nethermind.Synchronization.Peers; @@ -40,7 +37,6 @@ public class BodiesSyncFeed : BarrierSyncFeed private readonly ISyncReport _syncReport; private readonly ISyncPeerPool _syncPeerPool; private readonly ISyncPointers _syncPointers; - private readonly IDb _blocksDb; private SyncStatusList _syncStatusList; @@ -56,7 +52,6 @@ public BodiesSyncFeed( ISyncPeerPool syncPeerPool, ISyncConfig syncConfig, ISyncReport syncReport, - [KeyFilter(DbNames.Blocks)] IDb blocksDb, [KeyFilter(DbNames.Metadata)] IDb metadataDb, ILogManager logManager, long flushDbInterval = DefaultFlushDbInterval) @@ -67,7 +62,6 @@ public BodiesSyncFeed( _syncPeerPool = syncPeerPool; _syncConfig = syncConfig; _syncReport = syncReport; - _blocksDb = blocksDb; _flushDbInterval = flushDbInterval; if (!_syncConfig.FastSync) @@ -169,7 +163,9 @@ private void PostFinishCleanUp() private void Flush() { long lowestInsertedAtPoint = _syncStatusList.LowestInsertWithoutGaps; - _blocksDb.Flush(); + + _blockTree.Flush(FlushReason.InsertBlocks); + _syncPointers.LowestInsertedBodyNumber = lowestInsertedAtPoint; } @@ -277,7 +273,7 @@ private int InsertBodies(BodiesSyncBatch batch) private void InsertOneBlock(Block block) { - _blockTree.Insert(block, BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, bodiesWriteFlags: WriteFlags.DisableWAL); + _blockTree.Insert(block, BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, blockWriteFlags: WriteFlags.DisableWAL); _syncStatusList.MarkInserted(block.Number); } diff --git a/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs index 088c3f39d81..709db2ce3df 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastBlocks/FastHeadersSyncFeed.cs @@ -27,7 +27,6 @@ namespace Nethermind.Synchronization.FastBlocks { public class HeadersSyncFeed : ActivatedSyncFeed { - private readonly ILogger _logger; private readonly ISyncPeerPool _syncPeerPool; protected readonly ISyncReport _syncReport; @@ -638,6 +637,8 @@ protected virtual int InsertHeaders(HeadersSyncBatch batch) if (lowestInsertedHeader is not null && lowestInsertedHeader.Number < (LowestInsertedBlockHeader?.Number ?? long.MaxValue)) { + // Flush first, so that LowestInsertedHeader is preserved only after the headers are set + _blockTree.Flush(FlushReason.InsertHeaders); LowestInsertedBlockHeader = lowestInsertedHeader; } diff --git a/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs index 39b2e66d300..936a140f84e 100644 --- a/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs +++ b/src/Nethermind/Nethermind.Synchronization/ParallelSync/ActivatedSyncFeed.cs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: LGPL-3.0-only using System; -using System.Threading.Tasks; namespace Nethermind.Synchronization.ParallelSync {