Skip to content

Commit

Permalink
FastHeadersSyncFeed explicitly flush before setting metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
Scooletz committed Jan 24, 2025
1 parent 4cf0c47 commit 5df524c
Show file tree
Hide file tree
Showing 17 changed files with 85 additions and 19 deletions.
16 changes: 16 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,22 @@ public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions he
return AddBlockResult.Added;
}

public void Flush(FlushReason reason)
{
switch (reason)
{
case FlushReason.InsertHeaders:
_headerStore.Flush();
_chainLevelInfoRepository.Flush();
break;
case FlushReason.InsertBlocks:
_blockStore.Flush();
break;
default:
throw new ArgumentOutOfRangeException(nameof(reason), reason, null);
}
}

public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None)
{
Expand Down
6 changes: 4 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions he
public AddBlockResult Insert(Block block,
BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None,
WriteFlags bodiesWriteFlags = WriteFlags.None) =>
_overlayTree.Insert(block, insertBlockOptions, insertHeaderOptions, bodiesWriteFlags);
WriteFlags blockWriteFlags = WriteFlags.None) =>
_overlayTree.Insert(block, insertBlockOptions, insertHeaderOptions, blockWriteFlags);

public void Flush(FlushReason reason) => _overlayTree.Flush(reason);

public void UpdateHeadBlock(Hash256 blockHash) =>
_overlayTree.UpdateHeadBlock(blockHash);
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ public void Cache(Block block)
{
_blockCache.Set(block.Hash, block);
}

public void Flush() => blockDb.Flush();
}
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ public interface IBlockStore
ReceiptRecoveryBlock? GetReceiptRecoveryBlock(long blockNumber, Hash256 blockHash);
void Cache(Block block);
bool HasBlock(long blockNumber, Hash256 blockHash);
void Flush();
}
6 changes: 6 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public void InsertBlockNumber(Hash256 blockHash, long blockNumber)
return Get(blockHash)?.Number;
}

public void Flush()
{
_headerDb.Flush();
_blockNumberDb.Flush();
}

private long? GetBlockNumberFromBlockNumberDb(Hash256 blockHash)
{
Span<byte> numberSpan = _blockNumberDb.GetSpan(blockHash);
Expand Down
5 changes: 5 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Headers/IHeaderStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ public interface IHeaderStore
void Delete(Hash256 blockHash);
void InsertBlockNumber(Hash256 blockHash, long blockNumber);
long? GetBlockNumber(Hash256 blockHash);

/// <summary>
/// Flushed the underlying db.
/// </summary>
void Flush();
}
36 changes: 31 additions & 5 deletions src/Nethermind/Nethermind.Blockchain/IBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,25 @@ public interface IBlockTree : IBlockFinder
/// <param name="header">Header to add</param>
/// <param name="headerOptions"></param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None);
AddBlockResult Insert(BlockHeader header,
BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None);

/// <summary>
/// Inserts a disconnected block body (not for processing).
/// </summary>
/// <param name="block">Block to add</param>
/// <param name="blockWriteFlags">The write flags overrides to be used for this insert operation.</param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags bodiesWriteFlags = WriteFlags.None);
AddBlockResult Insert(Block block,
BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None,
WriteFlags blockWriteFlags = WriteFlags.None);

/// <summary>
/// Flushes underlying storages for the specific <paramref name="reason"/>.
/// </summary>
/// <param name="reason">The reason for flushing, showing what changes should be persisted.</param>
void Flush(FlushReason reason);

void UpdateHeadBlock(Hash256 blockHash);

Expand All @@ -83,15 +93,17 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption
/// <param name="block">Block to be included</param>
/// <param name="options">Options for suggesting block, whether a block should be processed or just added to the store.</param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
AddBlockResult SuggestBlock(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);
AddBlockResult SuggestBlock(Block block,
BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);

/// <summary>
/// Suggests block for inclusion in the block tree. Wait for DB unlock if needed.
/// </summary>
/// <param name="block">Block to be included</param>
/// <param name="options">Options for suggesting block, whether a block should be processed or just added to the store.</param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
ValueTask<AddBlockResult> SuggestBlockAsync(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);
ValueTask<AddBlockResult> SuggestBlockAsync(Block block,
BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);

/// <summary>
/// Suggests a block header (without body)
Expand Down Expand Up @@ -179,4 +191,18 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption

void RecalculateTreeLevels();
}

public enum FlushReason
{
/// <summary>
/// Flush after <see cref="IBlockTree.Insert" for a header is called/>
/// is called
/// </summary>
InsertHeaders,

/// <summary>
/// Flush after <see cref="IBlockTree.Insert"/> for a block is called.
/// </summary>
InsertBlocks,
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public async Task Accept(IBlockTreeVisitor blockTreeVisitor, CancellationToken c
public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None) =>
throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls");

public void Flush(FlushReason reason) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Flush)} calls");

public void Insert(IEnumerable<Block> blocks) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls");

public void UpdateHeadBlock(Hash256 blockHash)
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Era1/EraImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async Task ImportBlock(long blockNumber)
private void InsertBlockAndReceipts(Block b, TxReceipt[] r, long lastBlockNumber)
{
if (blockTree.FindBlock(b.Number) is null)
blockTree.Insert(b, BlockTreeInsertBlockOptions.SaveHeader | BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, bodiesWriteFlags: WriteFlags.DisableWAL);
blockTree.Insert(b, BlockTreeInsertBlockOptions.SaveHeader | BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, blockWriteFlags: WriteFlags.DisableWAL);
if (!receiptStorage.HasBlock(b.Number, b.Hash!))
receiptStorage.Insert(b, r, true, writeFlags: WriteFlags.DisableWAL, lastBlockNumber: lastBlockNumber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public bool HasBlock(long blockNumber, Hash256 blockHash)
{
return _blockNumDict.ContainsKey(blockNumber);
}

public void Flush()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ public void InsertBlockNumber(Hash256 blockHash, long blockNumber)
{
return _blockNumberDict.TryGetValue(blockHash, out var blockNumber) ? blockNumber : readonlyBaseHeaderStore.GetBlockNumber(blockHash);
}

public void Flush()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,7 @@ void LocalPersistLevel()
public BatchWrite StartBatch() => new(_writeLock);

public ChainLevelInfo? LoadLevel(long number) => _blockInfoDb.Get(number, Rlp.GetStreamDecoder<ChainLevelInfo>(), _blockInfoCache);

public void Flush() => _blockInfoDb.Flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ public interface IChainLevelInfoRepository
void PersistLevel(long number, ChainLevelInfo level, BatchWrite? batch = null);
BatchWrite StartBatch();
ChainLevelInfo? LoadLevel(long number);
void Flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public void Setup()
Substitute.For<ISyncPeerPool>(),
_syncConfig,
new NullSyncReport(),
_blocksDb,
_metadataDb,
LimboLogs.Instance,
flushDbInterval: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
using System.Threading.Tasks;
using Autofac.Features.AttributeFilters;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Db;
using Nethermind.Logging;
using Nethermind.Serialization.Rlp;
using Nethermind.Stats.Model;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
Expand All @@ -40,7 +37,6 @@ public class BodiesSyncFeed : BarrierSyncFeed<BodiesSyncBatch?>
private readonly ISyncReport _syncReport;
private readonly ISyncPeerPool _syncPeerPool;
private readonly ISyncPointers _syncPointers;
private readonly IDb _blocksDb;

private SyncStatusList _syncStatusList;

Expand All @@ -56,7 +52,6 @@ public BodiesSyncFeed(
ISyncPeerPool syncPeerPool,
ISyncConfig syncConfig,
ISyncReport syncReport,
[KeyFilter(DbNames.Blocks)] IDb blocksDb,
[KeyFilter(DbNames.Metadata)] IDb metadataDb,
ILogManager logManager,
long flushDbInterval = DefaultFlushDbInterval)
Expand All @@ -67,7 +62,6 @@ public BodiesSyncFeed(
_syncPeerPool = syncPeerPool;
_syncConfig = syncConfig;
_syncReport = syncReport;
_blocksDb = blocksDb;
_flushDbInterval = flushDbInterval;

if (!_syncConfig.FastSync)
Expand Down Expand Up @@ -169,7 +163,9 @@ private void PostFinishCleanUp()
private void Flush()
{
long lowestInsertedAtPoint = _syncStatusList.LowestInsertWithoutGaps;
_blocksDb.Flush();

_blockTree.Flush(FlushReason.InsertBlocks);

_syncPointers.LowestInsertedBodyNumber = lowestInsertedAtPoint;
}

Expand Down Expand Up @@ -277,7 +273,7 @@ private int InsertBodies(BodiesSyncBatch batch)

private void InsertOneBlock(Block block)
{
_blockTree.Insert(block, BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, bodiesWriteFlags: WriteFlags.DisableWAL);
_blockTree.Insert(block, BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, blockWriteFlags: WriteFlags.DisableWAL);
_syncStatusList.MarkInserted(block.Number);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace Nethermind.Synchronization.FastBlocks
{
public class HeadersSyncFeed : ActivatedSyncFeed<HeadersSyncBatch?>
{

private readonly ILogger _logger;
private readonly ISyncPeerPool _syncPeerPool;
protected readonly ISyncReport _syncReport;
Expand Down Expand Up @@ -638,6 +637,8 @@ protected virtual int InsertHeaders(HeadersSyncBatch batch)

if (lowestInsertedHeader is not null && lowestInsertedHeader.Number < (LowestInsertedBlockHeader?.Number ?? long.MaxValue))
{
// Flush first, so that LowestInsertedHeader is preserved only after the headers are set
_blockTree.Flush(FlushReason.InsertHeaders);
LowestInsertedBlockHeader = lowestInsertedHeader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;

namespace Nethermind.Synchronization.ParallelSync
{
Expand Down

0 comments on commit 5df524c

Please sign in to comment.