Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Channels rather than BlockingCollection for block processing #7790

Merged
merged 11 commits into from
Dec 11, 2024
10 changes: 9 additions & 1 deletion src/Nethermind/Nethermind.Consensus.Ethash/Ethash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,15 @@ private IEthashDataSet BuildCache(uint epoch)
_cacheStopwatch.Restart();
IEthashDataSet dataSet = new EthashCache(cacheSize, seed.Bytes);
_cacheStopwatch.Stop();
if (_logger.IsInfo) _logger.Info($"Cache for epoch {epoch} with size {cacheSize} and seed {seed.Bytes.ToHexString()} built in {_cacheStopwatch.ElapsedMilliseconds}ms");
if (_logger.IsInfo)
{
var seedText = seed.Bytes.ToHexString(withZeroX: true);
if (seedText.Length > 17)
{
seedText = $"{seedText[..8]}...{seedText[^6..]}";
}
_logger.Info($"Cache for epoch {epoch} with size {cacheSize} and seed {seedText} built in {_cacheStopwatch.ElapsedMilliseconds}ms");
}
return dataSet;
}

Expand Down
155 changes: 62 additions & 93 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockchainProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Find;
using Nethermind.Core;
using Nethermind.Core.Attributes;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Threading;
using Nethermind.Evm.Tracing;
using Nethermind.Evm.Tracing.GethStyle;
using Nethermind.Evm.Tracing.ParityStyle;
Expand All @@ -27,10 +28,10 @@ namespace Nethermind.Consensus.Processing;
public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessingQueue
{
public int SoftMaxRecoveryQueueSizeInTx = 10000; // adjust based on tx or gas
public const int MaxProcessingQueueSize = 2000; // adjust based on tx or gas
public const int MaxProcessingQueueSize = 2048; // adjust based on tx or gas

[ThreadStatic] private static bool _isMainProcessingThread;
public static bool IsMainProcessingThread => _isMainProcessingThread;
private static AsyncLocal<bool> _isMainProcessingThread = new();
public static bool IsMainProcessingThread => _isMainProcessingThread.Value;
public bool IsMainProcessor { get; init; }

public ITracerBag Tracers => _compositeBlockTracer;
Expand All @@ -42,10 +43,10 @@ public sealed class BlockchainProcessor : IBlockchainProcessor, IBlockProcessing
private readonly IBlockTree _blockTree;
private readonly ILogger _logger;

private readonly BlockingCollection<BlockRef> _recoveryQueue = new(new ConcurrentQueue<BlockRef>());
private readonly Channel<BlockRef> _recoveryQueue = Channel.CreateUnbounded<BlockRef>();
private bool _recoveryComplete = false;

private readonly BlockingCollection<BlockRef> _blockQueue = new(new ConcurrentQueue<BlockRef>(),
MaxProcessingQueueSize);
private readonly Channel<BlockRef> _blockQueue = Channel.CreateBounded<BlockRef>(MaxProcessingQueueSize);

private int _queueCount;

Expand Down Expand Up @@ -122,19 +123,19 @@ public void Enqueue(Block block, ProcessingOptions processingOptions)
? new BlockRef(blockHash, processingOptions)
: new BlockRef(block, processingOptions);

if (!_recoveryQueue.IsAddingCompleted)
if (!_recoveryComplete)
{
Interlocked.Increment(ref _queueCount);
try
{
_recoveryQueue.Add(blockRef);
_recoveryQueue.Writer.TryWrite(blockRef);
if (_logger.IsTrace) _logger.Trace($"A new block {block.ToString(Block.Format.Short)} enqueued for processing.");
}
catch (Exception e)
{
Interlocked.Decrement(ref _queueCount);
BlockRemoved?.Invoke(this, new BlockRemovedEventArgs(blockHash, ProcessingResult.QueueException, e));
if (e is not InvalidOperationException || !_recoveryQueue.IsAddingCompleted)
if (e is not InvalidOperationException || !_recoveryComplete)
{
throw;
}
Expand All @@ -151,59 +152,42 @@ public void Start()

public async Task StopAsync(bool processRemainingBlocks = false)
{
_recoveryComplete = true;
if (processRemainingBlocks)
{
_recoveryQueue.CompleteAdding();
_recoveryQueue.Writer.TryComplete();
await (_recoveryTask ?? Task.CompletedTask);
_blockQueue.CompleteAdding();
_blockQueue.Writer.TryComplete();
}
else
{
_loopCancellationSource?.Cancel();
_recoveryQueue.CompleteAdding();
_blockQueue.CompleteAdding();
_recoveryQueue.Writer.TryComplete();
_blockQueue.Writer.TryComplete();
}

await Task.WhenAll(_recoveryTask ?? Task.CompletedTask, _processorTask ?? Task.CompletedTask);
if (_logger.IsInfo) _logger.Info("Blockchain Processor shutdown complete.. please wait for all components to close");
}

private Task RunRecovery()
private async Task RunRecovery()
{
TaskCompletionSource tcs = new();

Thread thread = new(() =>
try
{
try
{
RunRecoveryLoop();
if (_logger.IsDebug) _logger.Debug("Sender address recovery complete.");
}
catch (OperationCanceledException)
{
if (_logger.IsDebug) _logger.Debug("Sender address recovery stopped.");
}
catch (Exception ex)
{
if (_logger.IsError) _logger.Error("Sender address recovery encountered an exception.", ex);
}
finally
{
tcs.SetResult();
}
})
await RunRecoveryLoop();
if (_logger.IsDebug) _logger.Debug("Sender address recovery complete.");
}
catch (OperationCanceledException)
{
IsBackground = true,
Name = "Block Recovery",
// Boost priority to make sure we process blocks as fast as possible
Priority = ThreadPriority.AboveNormal,
};
thread.Start();

return tcs.Task;
if (_logger.IsDebug) _logger.Debug("Sender address recovery stopped.");
}
catch (Exception ex)
{
if (_logger.IsError) _logger.Error("Sender address recovery encountered an exception.", ex);
}
}

private void RunRecoveryLoop()
private async Task RunRecoveryLoop()
{
void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Exception? exception = null)
{
Expand All @@ -212,9 +196,9 @@ void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Except
FireProcessingQueueEmpty();
}

if (_logger.IsDebug) _logger.Debug($"Starting recovery loop - {_blockQueue.Count} blocks waiting in the queue.");
if (_logger.IsDebug) _logger.Debug($"Starting recovery loop - {_blockQueue.Reader.Count} blocks waiting in the queue.");
_lastProcessedBlock = DateTime.UtcNow;
foreach (BlockRef blockRef in _recoveryQueue.GetConsumingEnumerable(_loopCancellationSource.Token))
await foreach (BlockRef blockRef in _recoveryQueue.Reader.ReadAllAsync(_loopCancellationSource.Token))
{
try
{
Expand All @@ -226,9 +210,9 @@ void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Except

try
{
_blockQueue.Add(blockRef);
await _blockQueue.Writer.WriteAsync(blockRef);
}
catch (Exception e)
catch (Exception e) when (e is not OperationCanceledException)
{
DecrementQueue(blockRef.BlockHash, ProcessingResult.QueueException, e);

Expand All @@ -255,54 +239,37 @@ void DecrementQueue(Hash256 blockHash, ProcessingResult processingResult, Except
}
}

private Task RunProcessing()
private async Task RunProcessing()
{
TaskCompletionSource tcs = new();
_isMainProcessingThread.Value = IsMainProcessor;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is used to determine if tries should be recovered from network and whether the tx pool should be notified of invalidated accounts in its cache (both should only happen from the main processor)


Thread thread = new(() =>
try
{
_isMainProcessingThread = IsMainProcessor;

try
{
RunProcessingLoop();
if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} complete.");
}
catch (OperationCanceledException)
{
if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} stopped.");
}
catch (Exception ex)
{
if (_logger.IsError) _logger.Error($"{nameof(BlockchainProcessor)} encountered an exception.", ex);
}
finally
{
tcs.SetResult();
}
})
await RunProcessingLoop();
if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} complete.");
}
catch (OperationCanceledException)
{
IsBackground = true,
Name = "Block Processor",
// Boost priority to make sure we process blocks as fast as possible
Priority = ThreadPriority.Highest,
};
thread.Start();

return tcs.Task;
if (_logger.IsDebug) _logger.Debug($"{nameof(BlockchainProcessor)} stopped.");
}
catch (Exception ex)
{
if (_logger.IsError) _logger.Error($"{nameof(BlockchainProcessor)} encountered an exception.", ex);
}
}

private void RunProcessingLoop()
private async Task RunProcessingLoop()
{
if (_logger.IsDebug) _logger.Debug($"Starting block processor - {_blockQueue.Count} blocks waiting in the queue.");
if (_logger.IsDebug) _logger.Debug($"Starting block processor - {_blockQueue.Reader.Count} blocks waiting in the queue.");

FireProcessingQueueEmpty();

GCScheduler.Instance.SwitchOnBackgroundGC(0);
foreach (BlockRef blockRef in _blockQueue.GetConsumingEnumerable(_loopCancellationSource.Token))
await foreach (BlockRef blockRef in _blockQueue.Reader.ReadAllAsync(_loopCancellationSource.Token))
{
using var handle = Thread.CurrentThread.BoostPriorityHighest();
// Have block, switch off background GC timer
GCScheduler.Instance.SwitchOffBackgroundGC(_blockQueue.Count);
GCScheduler.Instance.SwitchOffBackgroundGC(_blockQueue.Reader.Count);

try
{
Expand All @@ -329,7 +296,7 @@ private void RunProcessingLoop()
BlockRemoved?.Invoke(this, new BlockRemovedEventArgs(blockRef.BlockHash, ProcessingResult.Success));
}
}
catch (Exception exception)
catch (Exception exception) when (exception is not OperationCanceledException)
{
if (_logger.IsWarn) _logger.Warn($"Processing loop threw an exception. Block: {blockRef}, Exception: {exception}");
BlockRemoved?.Invoke(this, new BlockRemovedEventArgs(blockRef.BlockHash, ProcessingResult.Exception, exception));
Expand All @@ -339,10 +306,10 @@ private void RunProcessingLoop()
Interlocked.Decrement(ref _queueCount);
}

if (_logger.IsTrace) _logger.Trace($"Now {_blockQueue.Count} blocks waiting in the queue.");
if (_logger.IsTrace) _logger.Trace($"Now {_blockQueue.Reader.Count} blocks waiting in the queue.");
FireProcessingQueueEmpty();

GCScheduler.Instance.SwitchOnBackgroundGC(_blockQueue.Count);
GCScheduler.Instance.SwitchOnBackgroundGC(_blockQueue.Reader.Count);
}

if (_logger.IsInfo) _logger.Info("Block processor queue stopped.");
Expand Down Expand Up @@ -389,6 +356,9 @@ private void FireProcessingQueueEmpty()
return null;
}

bool readonlyChain = options.ContainsFlag(ProcessingOptions.ReadOnlyChain);
if (!readonlyChain) _stats.CaptureStartStats();

ProcessingBranch processingBranch = PrepareProcessingBranch(suggestedBlock, options);
PrepareBlocksToProcess(suggestedBlock, options, processingBranch);

Expand All @@ -412,14 +382,12 @@ private void FireProcessingQueueEmpty()
if (_logger.IsDebug) _logger.Debug($"Skipped processing of {suggestedBlock.ToString(Block.Format.FullHashAndNumber)}, last processed is null: {true}, processedBlocks.Length: {processedBlocks.Length}");
}

bool readonlyChain = options.ContainsFlag(ProcessingOptions.ReadOnlyChain);
if (!readonlyChain)
{
long blockProcessingTimeInMicrosecs = _stopwatch.ElapsedMicroseconds();
Metrics.LastBlockProcessingTimeInMs = blockProcessingTimeInMicrosecs / 1000;
Metrics.RecoveryQueueSize = _recoveryQueue.Count;
Metrics.ProcessingQueueSize = _blockQueue.Count;

Metrics.RecoveryQueueSize = _recoveryQueue.Reader.Count;
Metrics.ProcessingQueueSize = _blockQueue.Reader.Count;
_stats.UpdateStats(lastProcessed, processingBranch.Root, blockProcessingTimeInMicrosecs);
}

Expand Down Expand Up @@ -761,8 +729,9 @@ private bool RunSimpleChecksAheadOfProcessing(Block suggestedBlock, ProcessingOp

public void Dispose()
{
_recoveryQueue.Dispose();
_blockQueue.Dispose();
_recoveryComplete = true;
_recoveryQueue.Writer.TryComplete();
_blockQueue.Writer.TryComplete();
_loopCancellationSource?.Dispose();
_blockTree.NewBestSuggestedBlock -= OnNewBestBlock;
_blockTree.NewHeadBlock -= OnNewHeadBlock;
Expand Down
Loading