Skip to content

Commit

Permalink
Add metrics to the dispatcher (#968)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Dec 9, 2024
1 parent 9170987 commit cc34304
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 3 deletions.
1 change: 1 addition & 0 deletions disperser/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func RunController(ctx *cli.Context) error {
sigAgg,
nodeClientManager,
logger,
metricsRegistry,
)
if err != nil {
return fmt.Errorf("failed to create dispatcher: %v", err)
Expand Down
61 changes: 59 additions & 2 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"math"
"time"

Expand Down Expand Up @@ -38,6 +39,7 @@ type Dispatcher struct {
aggregator core.SignatureAggregator
nodeClientManager NodeClientManager
logger logging.Logger
metrics *dispatcherMetrics

cursor *blobstore.StatusIndexCursor
}
Expand All @@ -57,6 +59,7 @@ func NewDispatcher(
aggregator core.SignatureAggregator,
nodeClientManager NodeClientManager,
logger logging.Logger,
registry *prometheus.Registry,
) (*Dispatcher, error) {
if config == nil {
return nil, errors.New("config is required")
Expand All @@ -73,6 +76,7 @@ func NewDispatcher(
aggregator: aggregator,
nodeClientManager: nodeClientManager,
logger: logger.With("component", "Dispatcher"),
metrics: newDispatcherMetrics(registry),

cursor: nil,
}, nil
Expand Down Expand Up @@ -101,7 +105,6 @@ func (d *Dispatcher) Start(ctx context.Context) error {
}
continue
}

go func() {
err := d.HandleSignatures(ctx, batchData, sigChan)
if err != nil {
Expand All @@ -118,6 +121,11 @@ func (d *Dispatcher) Start(ctx context.Context) error {
}

func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, *batchData, error) {
start := time.Now()
defer func() {
d.metrics.reportHandleBatchLatency(time.Since(start))
}()

currentBlockNumber, err := d.chainState.GetCurrentBlockNumber()
if err != nil {
return nil, nil, fmt.Errorf("failed to get current block number: %w", err)
Expand Down Expand Up @@ -148,6 +156,8 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
continue
}

submissionStart := time.Now()

d.pool.Submit(func() {

req := &corev2.DispersalRequest{
Expand All @@ -158,14 +168,21 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
DispersedAt: uint64(time.Now().UnixNano()),
BatchHeader: *batch.BatchHeader,
}
putDispersalRequestStart := time.Now()
err := d.blobMetadataStore.PutDispersalRequest(ctx, req)
if err != nil {
d.logger.Error("failed to put dispersal request", "err", err)
return
}

for i := 0; i < d.NumRequestRetries+1; i++ {
d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart))

var i int
for i = 0; i < d.NumRequestRetries+1; i++ {
sendChunksStart := time.Now()
sig, err := d.sendChunks(ctx, client, batch)
sendChunksFinished := time.Now()
d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart))
if err == nil {
storeErr := d.blobMetadataStore.PutDispersalResponse(ctx, &corev2.DispersalResponse{
DispersalRequest: req,
Expand All @@ -177,6 +194,8 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
d.logger.Error("failed to put dispersal response", "err", storeErr)
}

d.metrics.reportPutDispersalResponseLatency(time.Since(sendChunksFinished))

sigChan <- core.SigningMessage{
Signature: sig,
Operator: opID,
Expand All @@ -191,28 +210,42 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
d.logger.Warn("failed to send chunks", "operator", opID, "NumAttempts", i, "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
}
d.metrics.reportSendChunksRetryCount(float64(i))
})

d.metrics.reportPoolSubmissionLatency(time.Since(submissionStart))
}

return sigChan, batchData, nil
}

// HandleSignatures receives signatures from operators, validates, and aggregates them
func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error {
handleSignaturesStart := time.Now()
defer func() {
d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart))
}()

quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
return fmt.Errorf("failed to receive and validate signatures: %w", err)
}
receiveSignaturesFinished := time.Now()
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults))
i := 0
for quorumID := range quorumAttestation.QuorumResults {
quorums[i] = quorumID
i++
}
aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums)
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to aggregate signatures: %w", err)
}

err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
BatchHeader: batchData.Batch.BatchHeader,
AttestedAt: uint64(time.Now().UnixNano()),
Expand All @@ -222,11 +255,15 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
Sigma: aggSig.AggSignature,
QuorumNumbers: quorums,
})
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation: %w", err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified: %w", err)
}
Expand All @@ -237,7 +274,14 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
// NewBatch creates a batch of blobs to dispatch
// Warning: This function is not thread-safe
func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) (*batchData, error) {
newBatchStart := time.Now()
defer func() {
d.metrics.reportNewBatchLatency(time.Since(newBatchStart))
}()

blobMetadatas, cursor, err := d.blobMetadataStore.GetBlobMetadataByStatusPaginated(ctx, v2.Encoded, d.cursor, d.MaxBatchSize)
getBlobMetadataFinished := time.Now()
d.metrics.reportGetBlobMetadataLatency(getBlobMetadataFinished.Sub(newBatchStart))
if err != nil {
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}
Expand All @@ -247,6 +291,8 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber)
getOperatorStateFinished := time.Now()
d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished))
if err != nil {
return nil, fmt.Errorf("failed to get operator state: %w", err)
}
Expand All @@ -264,6 +310,8 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}

certs, _, err := d.blobMetadataStore.GetBlobCertificates(ctx, keys)
getBlobCertificatesFinished := time.Now()
d.metrics.reportGetBlobCertificatesLatency(getBlobCertificatesFinished.Sub(getOperatorStateFinished))
if err != nil {
return nil, fmt.Errorf("failed to get blob certificates: %w", err)
}
Expand Down Expand Up @@ -304,11 +352,15 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
copy(batchHeader.BatchRoot[:], tree.Root())

batchHeaderHash, err := batchHeader.Hash()
buildMerkleTreeFinished := time.Now()
d.metrics.reportBuildMerkleTreeLatency(buildMerkleTreeFinished.Sub(getBlobCertificatesFinished))
if err != nil {
return nil, fmt.Errorf("failed to hash batch header: %w", err)
}

err = d.blobMetadataStore.PutBatchHeader(ctx, batchHeader)
putBatchHeaderFinished := time.Now()
d.metrics.reportPutBatchHeaderLatency(putBatchHeaderFinished.Sub(buildMerkleTreeFinished))
if err != nil {
return nil, fmt.Errorf("failed to put batch header: %w", err)
}
Expand Down Expand Up @@ -338,13 +390,18 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
}
}

proofGenerationFinished := time.Now()
d.metrics.reportProofLatency(proofGenerationFinished.Sub(putBatchHeaderFinished))

verificationInfos := make([]*corev2.BlobVerificationInfo, len(verificationInfoMap))
i := 0
for _, v := range verificationInfoMap {
verificationInfos[i] = v
i++
}
err = d.blobMetadataStore.PutBlobVerificationInfos(ctx, verificationInfos)
putBlobVerificationInfosFinished := time.Now()
d.metrics.reportPutVerificationInfosLatency(putBlobVerificationInfosFinished.Sub(proofGenerationFinished))
if err != nil {
return nil, fmt.Errorf("failed to put blob verification infos: %w", err)
}
Expand Down
Loading

0 comments on commit cc34304

Please sign in to comment.