Skip to content

Commit

Permalink
Declutter the log/metric in batch processing (#560)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored May 14, 2024
1 parent 3927a96 commit f366a3c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
5 changes: 5 additions & 0 deletions node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (g *Metrics) AcceptBatches(status string, batchSize uint64) {
g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize))
}

func (g *Metrics) RecordStoreChunksStage(stage string, dataSize uint64, latency time.Duration) {
g.AcceptBatches(stage, dataSize)
g.ObserveLatency("StoreChunks", stage, float64(latency.Milliseconds()))
}

func (g *Metrics) collectOnchainMetrics() {
ticker := time.NewTicker(time.Duration(g.onchainMetricsInterval) * time.Second)
defer ticker.Stop()
Expand Down
17 changes: 7 additions & 10 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
// Defined only if the batch not already exists and gets stored to database successfully.
keys *[][]byte

// Latency (in ms) to store the batch.
// Latency to store the batch.
// Defined only if the batch not already exists and gets stored to database successfully.
latency float64
latency time.Duration
}
storeChan := make(chan storeResult)
go func(n *Node) {
Expand All @@ -339,7 +339,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
}
return
}
storeChan <- storeResult{err: nil, keys: keys, latency: float64(time.Since(start).Milliseconds())}
storeChan <- storeResult{err: nil, keys: keys, latency: time.Since(start)}
}(n)

// Validate batch.
Expand All @@ -357,8 +357,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
}
return nil, fmt.Errorf("failed to validate batch: %w", err)
}
n.Metrics.AcceptBatches("validated", batchSize)
n.Metrics.ObserveLatency("StoreChunks", "validated", float64(time.Since(stageTimer).Milliseconds()))
n.Metrics.RecordStoreChunksStage("validated", batchSize, time.Since(stageTimer))
log.Debug("Validate batch took", "duration:", time.Since(stageTimer))

// Before we sign the batch, we should first complete the batch storing successfully.
Expand All @@ -368,18 +367,16 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
return nil, err
}
if result.keys != nil {
n.Metrics.AcceptBatches("stored", batchSize)
n.Metrics.ObserveLatency("StoreChunks", "stored", result.latency)
n.Logger.Debug("Store batch succeeded", "batchHeaderHash", batchHeaderHash, "duration:", time.Duration(result.latency*float64(time.Millisecond)))
n.Metrics.RecordStoreChunksStage("stored", batchSize, result.latency)
n.Logger.Debug("Store batch succeeded", "batchHeaderHash", batchHeaderHash, "duration:", result.latency)
} else {
n.Logger.Warn("Store batch skipped because the batch already exists in the store", "batchHeaderHash", batchHeaderHash)
}

// Sign batch header hash if all validation checks pass and data items are written to database.
stageTimer = time.Now()
sig := n.KeyPair.SignMessage(batchHeaderHash)
n.Metrics.AcceptBatches("signed", batchSize)
n.Metrics.ObserveLatency("StoreChunks", "signed", float64(time.Since(stageTimer).Milliseconds()))
n.Metrics.RecordStoreChunksStage("signed", batchSize, time.Since(stageTimer))
log.Debug("Sign batch succeeded", "pubkey", hexutil.Encode(n.KeyPair.GetPubKeyG2().Serialize()), "duration", time.Since(stageTimer))

log.Debug("Exiting process batch", "duration", time.Since(start))
Expand Down

0 comments on commit f366a3c

Please sign in to comment.