Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log e2e attestation latency #569

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ type SigningMessage struct {
Signature *Signature
Operator OperatorID
BatchHeaderHash [32]byte
Err error
// Undefined if this value <= 0.
AttestationLatencyMs float64
Err error
}

// SignatureAggregation contains the results of aggregating signatures from a set of operators
Expand Down Expand Up @@ -130,7 +132,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
}
batchHeaderHashHex := hex.EncodeToString(r.BatchHeaderHash[:])
if r.Err != nil {
a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "batchHeaderHash", batchHeaderHashHex, "err", r.Err)
a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "batchHeaderHash", batchHeaderHashHex, "attestationLatencyMs", r.AttestationLatencyMs, "err", r.Err)
continue
}

Expand Down Expand Up @@ -173,7 +175,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
aggPubKeys[ind].Add(op.PubkeyG2)
}
}
a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums), "batchHeaderHash", batchHeaderHashHex)
a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums), "batchHeaderHash", batchHeaderHashHex, "attestationLatencyMs", r.AttestationLatencyMs)
}

// Aggregate Non signer Pubkey Id
Expand Down
32 changes: 18 additions & 14 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,36 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera
if !hasAnyBundles {
// Operator is not part of any quorum, no need to send chunks
update <- core.SigningMessage{
Err: errors.New("operator is not part of any quorum"),
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
Err: errors.New("operator is not part of any quorum"),
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: -1,
}
return
}

requestedAt := time.Now()
sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op)
latencyMs := float64(time.Since(requestedAt).Milliseconds())
if err != nil {
update <- core.SigningMessage{
Err: err,
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
Err: err,
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
}
c.metrics.ObserveLatency(false, float64(time.Since(requestedAt).Milliseconds()))
c.metrics.ObserveLatency(false, latencyMs)
} else {
update <- core.SigningMessage{
Signature: sig,
Operator: id,
BatchHeaderHash: batchHeaderHash,
Err: nil,
Signature: sig,
Operator: id,
BatchHeaderHash: batchHeaderHash,
AttestationLatencyMs: latencyMs,
Err: nil,
}
c.metrics.ObserveLatency(true, float64(time.Since(requestedAt).Milliseconds()))
c.metrics.ObserveLatency(true, latencyMs)
}

}(core.IndexedOperatorInfo{
Expand Down
24 changes: 9 additions & 15 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"net"

Expand All @@ -16,7 +17,6 @@ import (
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -193,10 +193,7 @@ func (s *Server) validateStoreChunkRequest(in *pb.StoreChunksRequest) error {

// StoreChunks is called by dispersers to store data.
func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*pb.StoreChunksReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(sec float64) {
s.node.Metrics.ObserveLatency("StoreChunks", "total", sec*1000) // make milliseconds
}))
defer timer.ObserveDuration()
start := time.Now()

// Validate the request.
if err := s.validateStoreChunkRequest(in); err != nil {
Expand All @@ -208,21 +205,18 @@ func (s *Server) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (*p

// Record metrics.
if err != nil {
s.node.Metrics.RecordRPCRequest("StoreChunks", "failure")
s.node.Logger.Error("StoreChunks failed", "err", err)
s.node.Metrics.RecordRPCRequest("StoreChunks", "failure", time.Since(start))
s.node.Logger.Error("StoreChunks RPC failed", "duration", time.Since(start), "err", err)
} else {
s.node.Metrics.RecordRPCRequest("StoreChunks", "success")
s.node.Logger.Info("StoreChunks succeeded")
s.node.Metrics.RecordRPCRequest("StoreChunks", "success", time.Since(start))
s.node.Logger.Info("StoreChunks RPC succeeded", "duration", time.Since(start))
}

return reply, err
}

func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksRequest) (*pb.RetrieveChunksReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(sec float64) {
s.node.Metrics.ObserveLatency("RetrieveChunks", "total", sec*1000) // make milliseconds
}))
defer timer.ObserveDuration()
start := time.Now()

if in.GetQuorumId() > core.MaxQuorumID {
return nil, fmt.Errorf("invalid request: quorum ID must be in range [0, %d], but found %d", core.MaxQuorumID, in.GetQuorumId())
Expand Down Expand Up @@ -268,10 +262,10 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques

chunks, ok := s.node.Store.GetChunks(ctx, batchHeaderHash, int(in.GetBlobIndex()), uint8(in.GetQuorumId()))
if !ok {
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure")
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "failure", time.Since(start))
return nil, fmt.Errorf("could not find chunks for batchHeaderHash %v, blob index: %v, quorumID: %v", batchHeaderHash, in.GetBlobIndex(), in.GetQuorumId())
}
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "success")
s.node.Metrics.RecordRPCRequest("RetrieveChunks", "success", time.Since(start))
return &pb.RetrieveChunksReply{Chunks: chunks}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func (g *Metrics) Start() {
}
}

func (g *Metrics) RecordRPCRequest(method string, status string) {
func (g *Metrics) RecordRPCRequest(method string, status string, duration time.Duration) {
g.AccNumRequests.WithLabelValues(method, status).Inc()
g.ObserveLatency(method, "total", float64(duration.Milliseconds()))
}

func (g *Metrics) RecordSocketAddressChange() {
Expand Down
Loading