From e8a8b73c02ecb5ec0d90e62b5cbd76693f7c79b6 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 21 Nov 2024 10:55:11 +0100 Subject: [PATCH 1/7] fix --- block/submit.go | 1 + 1 file changed, 1 insertion(+) diff --git a/block/submit.go b/block/submit.go index 2105578d6..227302a35 100644 --- a/block/submit.go +++ b/block/submit.go @@ -110,6 +110,7 @@ func SubmitLoopInner( UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), batchSkewTime()) if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) { + pendingBytes.Store(pending) break } From bf8873bee13e8530a470f4f032e6e2f97b3accb2 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 21 Nov 2024 11:36:10 +0100 Subject: [PATCH 2/7] update ticker when break --- block/submit.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/submit.go b/block/submit.go index 227302a35..cbbdf6132 100644 --- a/block/submit.go +++ b/block/submit.go @@ -111,6 +111,7 @@ func SubmitLoopInner( if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) { pendingBytes.Store(pending) + ticker.Reset(maxBatchSubmitTime) break } @@ -129,7 +130,6 @@ func SubmitLoopInner( } return err } - ticker.Reset(maxBatchSubmitTime) pending = uint64(unsubmittedBlocksBytes()) logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level } From 47a91b8cd7a5b78206b54c5650043c5c4e7549d6 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 21 Nov 2024 12:59:52 +0100 Subject: [PATCH 3/7] restart submitting when skew time is correct without waiting for all pending batches --- block/submit.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/block/submit.go b/block/submit.go index cbbdf6132..18c77b08f 100644 --- a/block/submit.go +++ b/block/submit.go @@ -131,9 +131,11 @@ func SubmitLoopInner( return err } pending = uint64(unsubmittedBlocksBytes()) - logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending) // TODO: debug level + if batchSkewTime() < maxProduceSubmitSkewTime { + trigger.Nudge() + } + logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", batchSkewTime()) // TODO: debug level } - trigger.Nudge() } }) From f9d1fd891efa9d9f94495b8ac38f12bf1fc7444f Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Thu, 21 Nov 2024 17:39:22 +0100 Subject: [PATCH 4/7] improve error --- block/produce.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/produce.go b/block/produce.go index c6a2e5352..28f683dd2 100644 --- a/block/produce.go +++ b/block/produce.go @@ -82,7 +82,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) return nil case bytesProducedC <- bytesProducedN: default: - evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)} + evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time %s: %w", m.Conf.MaxSkewTime, gerrc.ErrResourceExhausted)} uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime) select { From 45c9b2bf305874510220fcca00a777c3d1cc01ae Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Fri, 22 Nov 2024 14:21:19 +0100 Subject: [PATCH 5/7] improving log msg --- block/produce.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/block/produce.go b/block/produce.go index 28f683dd2..440b03cad 100644 --- a/block/produce.go +++ b/block/produce.go @@ -82,9 +82,9 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) return nil case bytesProducedC <- bytesProducedN: default: - evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time %s: %w", m.Conf.MaxSkewTime, gerrc.ErrResourceExhausted)} + evt := &events.DataHealthStatus{Error: fmt.Errorf("Block production paused. Time between last block produced and last block submitted higher than max skew time: %s last block in settlement time: %s %w", m.Conf.MaxSkewTime, time.Unix(0, m.LastBlockTimeInSettlement.Load()), gerrc.ErrResourceExhausted)} uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList) - m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime) + m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime, "Last block in settlement time", time.Unix(0, m.LastBlockTimeInSettlement.Load())) select { case <-ctx.Done(): return nil From 5cf314c881c021ace1e05b9ae27e902eefbb4edd Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Mon, 25 Nov 2024 12:50:29 +0100 Subject: [PATCH 6/7] comments --- block/submit.go | 25 ++++++++++++------------- config/toml.go | 2 ++ 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/block/submit.go b/block/submit.go index 18c77b08f..bdd6140b4 100644 --- a/block/submit.go +++ b/block/submit.go @@ -44,10 +44,10 @@ func SubmitLoopInner( ctx context.Context, logger types.Logger, bytesProduced chan int, // a channel of block and commit bytes produced - maxProduceSubmitSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped. - unsubmittedBlocksNum func() uint64, - unsubmittedBlocksBytes func() int, - batchSkewTime func() time.Duration, + maxSkewTime time.Duration, // max time between last submitted block and last produced block allowed. if this threshold is reached block production is stopped. + unsubmittedBlocksNum func() uint64, // func that returns the amount of non-submitted blocks + unsubmittedBlocksBytes func() int, // func that returns bytes from non-submitted blocks + batchSkewTime func() time.Duration, // func that returns measured time between last submitted block and last produced block maxBatchSubmitTime time.Duration, // max time to allow between batches maxBatchSubmitBytes uint64, // max size of serialised batch in bytes createAndSubmitBatch func(maxSizeBytes uint64) (bytes uint64, err error), @@ -73,9 +73,8 @@ func SubmitLoopInner( submitter.Nudge() - if maxProduceSubmitSkewTime < batchSkewTime() { - // too much stuff is pending submission - // we block here until we get a progress nudge from the submitter thread + // if the time between the last produced block and last submitted is greater than maxSkewTime we block here until we get a progress nudge from the submitter thread + if maxSkewTime < batchSkewTime() { select { case <-ctx.Done(): return ctx.Err() @@ -87,8 +86,8 @@ func SubmitLoopInner( }) eg.Go(func() error { - // 'submitter': this thread actually creates and submits batches, and will do it on a timer if he isn't nudged by block production - ticker := time.NewTicker(maxBatchSubmitTime / 10) // interval does not need to match max batch time since we keep track anyway, it's just to wakeup + // 'submitter': this thread actually creates and submits batches. this thread is woken up every batch_submit_time (in addition to every block produced) to check if there is anything to submit even if no new blocks have been produced + ticker := time.NewTicker(maxBatchSubmitTime) for { select { case <-ctx.Done(): @@ -110,8 +109,6 @@ func SubmitLoopInner( UpdateBatchSubmissionGauges(pending, unsubmittedBlocksNum(), batchSkewTime()) if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) { - pendingBytes.Store(pending) - ticker.Reset(maxBatchSubmitTime) break } @@ -131,11 +128,13 @@ func SubmitLoopInner( return err } pending = uint64(unsubmittedBlocksBytes()) - if batchSkewTime() < maxProduceSubmitSkewTime { + if batchSkewTime() < maxSkewTime { trigger.Nudge() } - logger.Info("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", batchSkewTime()) // TODO: debug level + logger.Debug("Submitted a batch to both sub-layers.", "n bytes consumed from pending", nConsumed, "pending after", pending, "skew time", batchSkewTime()) } + // update pendingBytes with non submitted block bytes after all pending batches have been submitted + pendingBytes.Store(pending) } }) diff --git a/config/toml.go b/config/toml.go index 637e4b19c..9ee3d544d 100644 --- a/config/toml.go +++ b/config/toml.go @@ -69,7 +69,9 @@ const defaultConfigTemplate = ` block_time = "{{ .BlockManagerConfig.BlockTime }}" # block production interval in case of no transactions ("0s" produces empty blocks) max_idle_time = "{{ .BlockManagerConfig.MaxIdleTime }}" +# block production interval after block with no transactions max_proof_time = "{{ .BlockManagerConfig.MaxProofTime }}" +# maximum time the node will produce blocks without submitting to SL before stopping block production max_skew_time = "{{ .BlockManagerConfig.MaxSkewTime }}" From 575724fba25b63bb716f6f139feba3ff7ac3a5c1 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Mon, 25 Nov 2024 22:04:32 +0100 Subject: [PATCH 7/7] comment --- block/submit.go | 1 + 1 file changed, 1 insertion(+) diff --git a/block/submit.go b/block/submit.go index 6a3399b84..7f6dab4e1 100644 --- a/block/submit.go +++ b/block/submit.go @@ -128,6 +128,7 @@ func SubmitLoopInner( return err } pending = uint64(unsubmittedBlocksBytes()) //nolint:gosec // bytes size is always positive + // after new batch submitted we check the skew time to wake up 'trigger' thread and restart block production if batchSkewTime() < maxSkewTime { trigger.Nudge() }