Skip to content

Commit

Permalink
code-standards(submit loop): small refactor to submit loop to move ti…
Browse files Browse the repository at this point in the history
…mer to submitter thread (#1014)
  • Loading branch information
danwt authored Aug 13, 2024
1 parent fab177a commit d31bcf1
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
24 changes: 9 additions & 15 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,8 @@ func SubmitLoopInner(
submitter := uchannel.NewNudger() // used to avoid busy waiting (using cpu) on submitter thread

eg.Go(func() error {
// 'trigger': we need one thread to continuously consume the bytes produced channel, and to monitor timer
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since the other thread keeps track of the actual time
defer ticker.Stop()
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
if maxBatchSkew*maxBatchBytes < pendingBytes.Load() {
// too much stuff is pending submission
Expand All @@ -63,13 +62,6 @@ func SubmitLoopInner(
case <-ctx.Done():
return ctx.Err()
case <-trigger.C:
case <-ticker.C:
// It's theoretically possible for the thread scheduler to pause this thread after entering this if statement
// for enough time for the submitter thread to submit all the pending bytes and do the nudge, and then for the
// thread scheduler to wake up this thread after the nudge has been missed, which would be a deadlock.
// Although this is only a theoretical possibility which should never happen in practice, it may be possible, e.g.
// in adverse CPU conditions or tests using compressed timeframes. To be sound, we also nudge with the ticker, which
// has no downside.
}
} else {
select {
Expand All @@ -78,26 +70,27 @@ func SubmitLoopInner(
case n := <-bytesProduced:
pendingBytes.Add(uint64(n))
logger.Info("Added bytes produced to bytes pending submission counter.", "n", n)
case <-ticker.C:
}
}

types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewNumBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))
submitter.Nudge()
}
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
timeLastSubmission := time.Now()
ticker := time.NewTicker(maxBatchTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
case <-submitter.C:
}
pending := pendingBytes.Load()
types.RollappPendingSubmissionsSkewNumBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewNumBatches.Set(float64(pendingBytes.Load() / maxBatchBytes))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
Expand All @@ -117,6 +110,7 @@ func SubmitLoopInner(
return err
}
timeLastSubmission = time.Now()
ticker.Reset(maxBatchTime)
pending = uatomic.Uint64Sub(&pendingBytes, nConsumed)
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
}
Expand Down
6 changes: 3 additions & 3 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func testSubmitLoopInner(
nProducedBytes.Add(^uint64(consumed - 1)) // subtract

timeLastProgressT := time.Unix(timeLastProgress.Load(), 0)
absoluteMax := int64(1.5 * float64(args.maxTime)) // allow some leeway for code execution
absoluteMax := int64(2 * float64(args.maxTime)) // allow some leeway for code execution. Tests may run on small boxes (GH actions)
timeSinceLast := time.Since(timeLastProgressT).Milliseconds()
require.True(t, timeSinceLast < absoluteMax, "too long since last update", "timeSinceLast", timeSinceLast, "max", absoluteMax)

Expand All @@ -115,7 +115,7 @@ func TestSubmitLoopFastProducerHaltingSubmitter(t *testing.T) {
testSubmitLoop(
t,
testArgs{
nParallel: 100,
nParallel: 50,
testDuration: 2 * time.Second,
batchSkew: 10,
batchBytes: 100,
Expand All @@ -136,7 +136,7 @@ func TestSubmitLoopTimer(t *testing.T) {
testSubmitLoop(
t,
testArgs{
nParallel: 100,
nParallel: 50,
testDuration: 2 * time.Second,
batchSkew: 10,
batchBytes: 100,
Expand Down

0 comments on commit d31bcf1

Please sign in to comment.