Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(manager): resume block production right when skew < max skew #1252

Merged
merged 8 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime)
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load()))
select {
case <-ctx.Done():
return nil
Expand Down
28 changes: 15 additions & 13 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func SubmitLoopInner(
ctx context.Context,
logger types.Logger,
bytesProduced chan int, // a channel of block and commit bytes produced
maxProduceSubmitSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksBytes func() int,
batchSkewTime func() time.Duration,
maxSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped.
unsubmittedBlocksNum func() uint64, // func that returns the amount of non-submitted blocks
unsubmittedBlocksBytes func() int, // func that returns bytes from non-submitted blocks
batchSkewTime func() time.Duration, // func that returns measured time between last submitted block and last produced block
maxBatchSubmitTime time.Duration, // max time to allow between batches
maxBatchSubmitBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error),
Expand All @@ -73,9 +73,8 @@ func SubmitLoopInner(

submitter.Nudge()

if maxProduceSubmitSkewTime < batchSkewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
// if the time between the last produced block and last submitted is greater than maxSkewTime we block here until we get a progress nudge from the submitter thread
if maxSkewTime < batchSkewTime() {
select {
case <-ctx.Done():
return nil
Expand All @@ -87,8 +86,8 @@ func SubmitLoopInner(
})

eg.Go(func() error {
// 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production
ticker := time.NewTicker(maxBatchSubmitTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup
// 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time (in addition to every block produced) to check if there is anything to submit even if no new blocks have been produced
ticker := time.NewTicker(maxBatchSubmitTime)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -128,11 +127,14 @@ func SubmitLoopInner(
}
return err
}
ticker.Reset(maxBatchSubmitTime)
pending = uint64(unsubmittedBlocksBytes()) //nolint:gosec // bytes size is always positive
logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level
pending = uint64(unsubmittedBlocksBytes()) //nolint:gosec // bytes size is always positive
if batchSkewTime() < maxSkewTime {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u elaborate on the use of this if?
why not call trigger.Nudge() after submission loop finished as before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is called after the loop, the block production wont be restarted until all pending batches, that have been created before stopping block production, are submitted to SL. this way block production will be restarted after the skew time condition is met without having to wait.

trigger.Nudge()
}
logger.Debug("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", batchSkewTime())
}
trigger.Nudge()
// update pendingBytes with non submitted block bytes after all pending batches have been submitted
pendingBytes.Store(pending)
}
})

Expand Down
2 changes: 2 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ const defaultConfigTemplate = `
block_time = "{{ .BlockManagerConfig.BlockTime }}"
# block production interval in case of no transactions ("0s" produces empty blocks)
max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}"
# block production interval after block with no transactions
max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}"
# maximum time the node will produce blocks without submitting to SL before stopping block production
max_skew_time = "{{ .BlockManagerConfig.MaxSkewTime }}"


Expand Down
Loading