From d31bcf1f2ad92e237ab663d1429d148887a71e40 Mon Sep 17 00:00:00 2001 From: Daniel T <30197399+danwt@users.noreply.github.com> Date: Tue, 13 Aug 2024 10:14:38 +0100 Subject: [PATCH] code-standards(submit loop): small refactor to submit loop to move timer to submitter thread (#1014) --- block/submit.go | 24 +++++++++--------------- block/submit_loop_test.go | 6 +++--- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/block/submit.go b/block/submit.go index d55423f6c..8c5dd5d18 100644 --- a/block/submit.go +++ b/block/submit.go @@ -52,9 +52,8 @@ func SubmitLoopInner( submitter := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on submitter thread eg.Go(func() error { - // 'trigger': we need one thread to continuously consume the bytes produced channel, and to monitor timer - ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since the other thread keeps track of the actual time - defer ticker.Stop() + // 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop + // if it gets too far ahead. for { if maxBatchSkew*maxBatchBytes < pendingBytes.Load() { // too much stuff is pending submission @@ -63,13 +62,6 @@ func SubmitLoopInner( case <-ctx.Done(): return ctx.Err() case <-trigger.C: - case <-ticker.C: - // It's theoretically possible for the thread scheduler to pause this thread after entering this if statement - // for enough time for the submitter thread to submit all the pending bytes and do the nudge, and then for the - // thread scheduler to wake up this thread after the nudge has been missed, which would be a deadlock. - // Although this is only a theoretical possibility which should never happen in practice, it may be possible, e.g. - // in adverse CPU conditions or tests using compressed timeframes. To be sound, we also nudge with the ticker, which - // has no downside. } } else { select { @@ -78,26 +70,27 @@ func SubmitLoopInner( case n := <-bytesProduced: pendingBytes.Add(uint64(n)) logger.Info("Added bytes produced to bytes pending submission counter.", "n", n) - case <-ticker.C: } } - - types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load())) - types.RollappPendingSubmissionsSkewNumBatches.Set(float64(pendingBytes.Load() / maxBatchBytes)) submitter.Nudge() } }) eg.Go(func() error { - // 'submitter': this thread actually creates and submits batches + // 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production timeLastSubmission := time.Now() + ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup for { select { case <-ctx.Done(): return ctx.Err() + case <-ticker.C: case <-submitter.C: } pending := pendingBytes.Load() + types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load())) + types.RollappPendingSubmissionsSkewNumBatches.Set(float64(pendingBytes.Load() / maxBatchBytes)) + // while there are accumulated blocks, create and submit batches!! for { done := ctx.Err() != nil @@ -117,6 +110,7 @@ func SubmitLoopInner( return err } timeLastSubmission = time.Now() + ticker.Reset(maxBatchTime) pending = uatomic.Uint64Sub(&pendingBytes, nConsumed) logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level } diff --git a/block/submit_loop_test.go b/block/submit_loop_test.go index 8203a4536..25ee2e14f 100644 --- a/block/submit_loop_test.go +++ b/block/submit_loop_test.go @@ -99,7 +99,7 @@ func testSubmitLoopInner( nProducedBytes.Add(^uint64(consumed - 1)) // subtract timeLastProgressT := time.Unix(timeLastProgress.Load(), 0) - absoluteMax := int64(1.5 * float64(args.maxTime)) // allow some leeway for code execution + absoluteMax := int64(2 * float64(args.maxTime)) // allow some leeway for code execution. Tests may run on small boxes (GH actions) timeSinceLast := time.Since(timeLastProgressT).Milliseconds() require.True(t, timeSinceLast < absoluteMax, "too long since last update", "timeSinceLast", timeSinceLast, "max", absoluteMax) @@ -115,7 +115,7 @@ func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) { testSubmitLoop( t, testArgs{ - nParallel: 100, + nParallel: 50, testDuration: 2 * time.Second, batchSkew: 10, batchBytes: 100, @@ -136,7 +136,7 @@ func TestSubmitLoopTimer(t *testing.T) { testSubmitLoop( t, testArgs{ - nParallel: 100, + nParallel: 50, testDuration: 2 * time.Second, batchSkew: 10, batchBytes: 100,