Skip to content

Commit

Permalink
Prewarm txns in order (of processing) (#7327)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored and kamilchodola committed Aug 14, 2024
1 parent 235c5ca commit a484d0d
Showing 1 changed file with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, Cancel
{
if (targetWorldState.ClearCache())
{
if (_logger.IsWarn) _logger.Warn("Cashes are not empty. Clearing them.");
if (_logger.IsWarn) _logger.Warn("Caches are not empty. Clearing them.");
}

if (!IsGenesisBlock(parentStateRoot) && Environment.ProcessorCount > 2 && !cancellationToken.IsCancellationRequested)
Expand All @@ -54,9 +54,15 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot

try
{
var physicalCoreCount = RuntimeInformation.PhysicalCoreCount;
if (physicalCoreCount < 2)
{
if (_logger.IsDebug) _logger.Debug("Physical core count is less than 2. Skipping pre-warming.");
return;
}
if (_logger.IsDebug) _logger.Debug($"Started pre-warming caches for block {suggestedBlock.Number}.");

ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = Math.Max(1, RuntimeInformation.PhysicalCoreCount - 2), CancellationToken = cancellationToken };
ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = physicalCoreCount - 1, CancellationToken = cancellationToken };
IReleaseSpec spec = specProvider.GetSpec(suggestedBlock.Header);

WarmupTransactions(parallelOptions, spec, suggestedBlock, parentStateRoot);
Expand All @@ -74,13 +80,18 @@ void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
i =>
_ =>
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
}
catch (Exception ex)
Expand All @@ -98,29 +109,35 @@ void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block
void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
Parallel.For(0, block.Transactions.Length, parallelOptions, i =>
{
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed >= i) return;

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
Transaction tx = block.Transactions[i];
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;

tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx {tx.Hash} with {result}");
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx.Hash}", ex);
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
Expand Down

0 comments on commit a484d0d

Please sign in to comment.