diff --git a/core/aggregation.go b/core/aggregation.go index 7e311f8119..4a17525748 100644 --- a/core/aggregation.go +++ b/core/aggregation.go @@ -27,7 +27,9 @@ type SigningMessage struct { Signature *Signature Operator OperatorID BatchHeaderHash [32]byte - Err error + // Undefined if this value <= 0. + AttestationLatencyMs float64 + Err error } // SignatureAggregation contains the results of aggregating signatures from a set of operators @@ -130,7 +132,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state } batchHeaderHashHex := hex.EncodeToString(r.BatchHeaderHash[:]) if r.Err != nil { - a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "batchHeaderHash", batchHeaderHashHex, "err", r.Err) + a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "batchHeaderHash", batchHeaderHashHex, "attestationLatencyMs", r.AttestationLatencyMs, "err", r.Err) continue } @@ -173,7 +175,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state aggPubKeys[ind].Add(op.PubkeyG2) } } - a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums), "batchHeaderHash", batchHeaderHashHex) + a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums), "batchHeaderHash", batchHeaderHashHex, "attestationLatencyMs", r.AttestationLatencyMs) } // Aggregate Non signer Pubkey Id diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index b9648bf808..95dcb575a2 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -68,32 +68,36 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera if !hasAnyBundles { // Operator is not part of any quorum, no need to send chunks update <- core.SigningMessage{ - Err: errors.New("operator is not part of any quorum"), - Signature: nil, - Operator: id, - BatchHeaderHash: batchHeaderHash, + Err: errors.New("operator is not part of any quorum"), + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, + AttestationLatencyMs: -1, } return } requestedAt := time.Now() sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op) + latencyMs := float64(time.Since(requestedAt).Milliseconds()) if err != nil { update <- core.SigningMessage{ - Err: err, - Signature: nil, - Operator: id, - BatchHeaderHash: batchHeaderHash, + Err: err, + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, + AttestationLatencyMs: latencyMs, } - c.metrics.ObserveLatency(false, float64(time.Since(requestedAt).Milliseconds())) + c.metrics.ObserveLatency(false, latencyMs) } else { update <- core.SigningMessage{ - Signature: sig, - Operator: id, - BatchHeaderHash: batchHeaderHash, - Err: nil, + Signature: sig, + Operator: id, + BatchHeaderHash: batchHeaderHash, + AttestationLatencyMs: latencyMs, + Err: nil, } - c.metrics.ObserveLatency(true, float64(time.Since(requestedAt).Milliseconds())) + c.metrics.ObserveLatency(true, latencyMs) } }(core.IndexedOperatorInfo{ diff --git a/node/grpc/server.go b/node/grpc/server.go index 4f4bfa13cc..57a3b8e510 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "net" @@ -16,7 +17,6 @@ import ( "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -193,10 +193,7 @@ func (s *Server) validateStoreChunkRequest(in *pb.StoreChunksRequest) error { // StoreChunks is called by dispersers to store data. func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) { - timer := prometheus.NewTimer(prometheus.ObserverFunc(func(sec float64) { - s.node.Metrics.ObserveLatency("StoreChunks", "total", sec*1000) // make milliseconds - })) - defer timer.ObserveDuration() + start := time.Now() // Validate the request. if err := s.validateStoreChunkRequest(in); err != nil { @@ -208,21 +205,18 @@ func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*p // Record metrics. if err != nil { - s.node.Metrics.RecordRPCRequest("StoreChunks", "failure") - s.node.Logger.Error("StoreChunks failed", "err", err) + s.node.Metrics.RecordRPCRequest("StoreChunks", "failure", time.Since(start)) + s.node.Logger.Error("StoreChunks RPC failed", "duration", time.Since(start), "err", err) } else { - s.node.Metrics.RecordRPCRequest("StoreChunks", "success") - s.node.Logger.Info("StoreChunks succeeded") + s.node.Metrics.RecordRPCRequest("StoreChunks", "success", time.Since(start)) + s.node.Logger.Info("StoreChunks RPC succeeded", "duration", time.Since(start)) } return reply, err } func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksRequest) (*pb.RetrieveChunksReply, error) { - timer := prometheus.NewTimer(prometheus.ObserverFunc(func(sec float64) { - s.node.Metrics.ObserveLatency("RetrieveChunks", "total", sec*1000) // make milliseconds - })) - defer timer.ObserveDuration() + start := time.Now() if in.GetQuorumId() > core.MaxQuorumID { return nil, fmt.Errorf("invalid request: quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, in.GetQuorumId()) @@ -268,10 +262,10 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques chunks, ok := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId())) if !ok { - s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure") + s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure", time.Since(start)) return nil, fmt.Errorf("could not find chunks for batchHeaderHash %v, blob index: %v, quorumID: %v", batchHeaderHash, in.GetBlobIndex(), in.GetQuorumId()) } - s.node.Metrics.RecordRPCRequest("RetrieveChunks", "success") + s.node.Metrics.RecordRPCRequest("RetrieveChunks", "success", time.Since(start)) return &pb.RetrieveChunksReply{Chunks: chunks}, nil } diff --git a/node/metrics.go b/node/metrics.go index 212f5ddab3..a429a5513e 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -161,8 +161,9 @@ func (g *Metrics) Start() { } } -func (g *Metrics) RecordRPCRequest(method string, status string) { +func (g *Metrics) RecordRPCRequest(method string, status string, duration time.Duration) { g.AccNumRequests.WithLabelValues(method, status).Inc() + g.ObserveLatency(method, "total", float64(duration.Milliseconds())) } func (g *Metrics) RecordSocketAddressChange() {