From f366a3cfb4b5bc988c4fbc81e37f4149594cfb2d Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Tue, 14 May 2024 11:54:23 -0700 Subject: [PATCH] Declutter the log/metric in batch processing (#560) --- node/metrics.go | 5 +++++ node/node.go | 17 +++++++---------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/node/metrics.go b/node/metrics.go index 5b7fc53ce7..8435948dba 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -181,6 +181,11 @@ func (g *Metrics) AcceptBatches(status string, batchSize uint64) { g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize)) } +func (g *Metrics) RecordStoreChunksStage(stage string, dataSize uint64, latency time.Duration) { + g.AcceptBatches(stage, dataSize) + g.ObserveLatency("StoreChunks", stage, float64(latency.Milliseconds())) +} + func (g *Metrics) collectOnchainMetrics() { ticker := time.NewTicker(time.Duration(g.onchainMetricsInterval) * time.Second) defer ticker.Stop() diff --git a/node/node.go b/node/node.go index f3b3658ee0..ab56593fa2 100644 --- a/node/node.go +++ b/node/node.go @@ -321,9 +321,9 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs // Defined only if the batch not already exists and gets stored to database successfully. keys *[][]byte - // Latency (in ms) to store the batch. + // Latency to store the batch. // Defined only if the batch not already exists and gets stored to database successfully. - latency float64 + latency time.Duration } storeChan := make(chan storeResult) go func(n *Node) { @@ -339,7 +339,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs } return } - storeChan <- storeResult{err: nil, keys: keys, latency: float64(time.Since(start).Milliseconds())} + storeChan <- storeResult{err: nil, keys: keys, latency: time.Since(start)} }(n) // Validate batch. @@ -357,8 +357,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs } return nil, fmt.Errorf("failed to validate batch: %w", err) } - n.Metrics.AcceptBatches("validated", batchSize) - n.Metrics.ObserveLatency("StoreChunks", "validated", float64(time.Since(stageTimer).Milliseconds())) + n.Metrics.RecordStoreChunksStage("validated", batchSize, time.Since(stageTimer)) log.Debug("Validate batch took", "duration:", time.Since(stageTimer)) // Before we sign the batch, we should first complete the batch storing successfully. @@ -368,9 +367,8 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs return nil, err } if result.keys != nil { - n.Metrics.AcceptBatches("stored", batchSize) - n.Metrics.ObserveLatency("StoreChunks", "stored", result.latency) - n.Logger.Debug("Store batch succeeded", "batchHeaderHash", batchHeaderHash, "duration:", time.Duration(result.latency*float64(time.Millisecond))) + n.Metrics.RecordStoreChunksStage("stored", batchSize, result.latency) + n.Logger.Debug("Store batch succeeded", "batchHeaderHash", batchHeaderHash, "duration:", result.latency) } else { n.Logger.Warn("Store batch skipped because the batch already exists in the store", "batchHeaderHash", batchHeaderHash) } @@ -378,8 +376,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs // Sign batch header hash if all validation checks pass and data items are written to database. stageTimer = time.Now() sig := n.KeyPair.SignMessage(batchHeaderHash) - n.Metrics.AcceptBatches("signed", batchSize) - n.Metrics.ObserveLatency("StoreChunks", "signed", float64(time.Since(stageTimer).Milliseconds())) + n.Metrics.RecordStoreChunksStage("signed", batchSize, time.Since(stageTimer)) log.Debug("Sign batch succeeded", "pubkey", hexutil.Encode(n.KeyPair.GetPubKeyG2().Serialize()), "duration", time.Since(stageTimer)) log.Debug("Exiting process batch", "duration", time.Since(start))