From 39e15520150772cd4b629087ee15b3dfd7b59822 Mon Sep 17 00:00:00 2001 From: Ian Shim Date: Mon, 16 Dec 2024 16:16:18 -0800 Subject: [PATCH] return attestation err result to dispatcher --- core/aggregation.go | 12 +++++ core/v2/types.go | 16 +++--- disperser/apiserver/get_blob_status_v2.go | 8 ++- disperser/apiserver/server_v2_test.go | 3 +- disperser/controller/dispatcher.go | 60 +++++++++++++++++------ disperser/controller/encoding_manager.go | 2 +- node/grpc/server_v2.go | 11 ++++- 7 files changed, 84 insertions(+), 28 deletions(-) diff --git a/core/aggregation.go b/core/aggregation.go index d0c01c3abb..5b7f87496d 100644 --- a/core/aggregation.go +++ b/core/aggregation.go @@ -273,6 +273,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In // Aggregate the aggregated signatures. We reuse the first aggregated signature as the accumulator var aggSig *Signature for _, quorumID := range quorumIDs { + if quorumAttestation.AggSignature[quorumID] == nil { + a.Logger.Error("cannot aggregate signature for quorum because aggregated signature is nil", "quorumID", quorumID) + continue + } sig := quorumAttestation.AggSignature[quorumID] if aggSig == nil { aggSig = &Signature{sig.G1Point.Clone()} @@ -284,6 +288,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In // Aggregate the aggregated public keys. We reuse the first aggregated public key as the accumulator var aggPubKey *G2Point for _, quorumID := range quorumIDs { + if quorumAttestation.SignersAggPubKey[quorumID] == nil { + a.Logger.Error("cannot aggregate public key for quorum because signers aggregated public key is nil", "quorumID", quorumID) + continue + } apk := quorumAttestation.SignersAggPubKey[quorumID] if aggPubKey == nil { aggPubKey = apk.Clone() @@ -315,6 +323,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In quorumAggKeys := make(map[QuorumID]*G1Point, len(quorumIDs)) for _, quorumID := range quorumIDs { + if quorumAttestation.QuorumAggPubKey[quorumID] == nil { + a.Logger.Error("cannot aggregate public key for quorum because aggregated public key is nil", "quorumID", quorumID) + continue + } quorumAggKeys[quorumID] = quorumAttestation.QuorumAggPubKey[quorumID] } diff --git a/core/v2/types.go b/core/v2/types.go index f1c129a8df..8b51dfb6ed 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -314,7 +314,7 @@ type Attestation struct { QuorumNumbers []core.QuorumID } -func (a *Attestation) ToProtobuf() *disperserpb.Attestation { +func (a *Attestation) ToProtobuf() (*disperserpb.Attestation, error) { nonSignerPubKeys := make([][]byte, len(a.NonSignerPubKeys)) for i, p := range a.NonSignerPubKeys { pubkeyBytes := p.Bytes() @@ -322,14 +322,16 @@ func (a *Attestation) ToProtobuf() *disperserpb.Attestation { } quorumAPKs := make([][]byte, len(a.QuorumAPKs)) - for i, p := range a.QuorumAPKs { - apkBytes := p.Bytes() - quorumAPKs[i] = apkBytes[:] - } - quorumNumbers := make([]uint32, len(a.QuorumNumbers)) for i, q := range a.QuorumNumbers { quorumNumbers[i] = uint32(q) + + apk, ok := a.QuorumAPKs[q] + if !ok { + return nil, fmt.Errorf("missing quorum APK for quorum %d", q) + } + apkBytes := apk.Bytes() + quorumAPKs[i] = apkBytes[:] } apkG2Bytes := a.APKG2.Bytes() @@ -341,7 +343,7 @@ func (a *Attestation) ToProtobuf() *disperserpb.Attestation { QuorumApks: quorumAPKs, Sigma: sigmaBytes[:], QuorumNumbers: quorumNumbers, - } + }, nil } type BlobVerificationInfo struct { diff --git a/disperser/apiserver/get_blob_status_v2.go b/disperser/apiserver/get_blob_status_v2.go index e4d32475a8..dfb4942ad3 100644 --- a/disperser/apiserver/get_blob_status_v2.go +++ b/disperser/apiserver/get_blob_status_v2.go @@ -84,12 +84,18 @@ func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatu continue } + attestationProto, err := attestation.ToProtobuf() + if err != nil { + s.logger.Error("failed to convert attestation to protobuf", "err", err, "blobKey", blobKey.Hex()) + continue + } + // return the first signed batch found return &pb.BlobStatusReply{ Status: metadata.BlobStatus.ToProfobuf(), SignedBatch: &pb.SignedBatch{ Header: batchHeader.ToProtobuf(), - Attestation: attestation.ToProtobuf(), + Attestation: attestationProto, }, BlobVerificationInfo: blobVerificationInfoProto, }, nil diff --git a/disperser/apiserver/server_v2_test.go b/disperser/apiserver/server_v2_test.go index 7f95a8f3bd..0eaa4846a6 100644 --- a/disperser/apiserver/server_v2_test.go +++ b/disperser/apiserver/server_v2_test.go @@ -387,7 +387,8 @@ func TestV2GetBlobStatus(t *testing.T) { require.Equal(t, verificationInfo0.InclusionProof, reply.GetBlobVerificationInfo().GetInclusionProof()) require.Equal(t, batchHeader.BatchRoot[:], reply.GetSignedBatch().GetHeader().BatchRoot) require.Equal(t, batchHeader.ReferenceBlockNumber, reply.GetSignedBatch().GetHeader().ReferenceBlockNumber) - attestationProto := attestation.ToProtobuf() + attestationProto, err := attestation.ToProtobuf() + require.NoError(t, err) require.Equal(t, attestationProto, reply.GetSignedBatch().GetAttestation()) } diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index f48a9783ff..366ecad4e8 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/hex" "errors" "fmt" "math" @@ -99,7 +100,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { sigChan, batchData, err := d.HandleBatch(ctx) if err != nil { if errors.Is(err, errNoBlobsToDispatch) { - d.logger.Warn("no blobs to dispatch") + d.logger.Debug("no blobs to dispatch") } else { d.logger.Error("failed to process a batch", "err", err) } @@ -110,6 +111,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { if err != nil { d.logger.Error("failed to handle signatures", "err", err) } + close(sigChan) // TODO(ian-shim): handle errors and mark failed }() } @@ -172,15 +174,24 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, err := d.blobMetadataStore.PutDispersalRequest(ctx, req) if err != nil { d.logger.Error("failed to put dispersal request", "err", err) + sigChan <- core.SigningMessage{ + Signature: nil, + Operator: opID, + BatchHeaderHash: batchData.BatchHeaderHash, + AttestationLatencyMs: 0, + Err: err, + } return } d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart)) var i int + var lastErr error for i = 0; i < d.NumRequestRetries+1; i++ { sendChunksStart := time.Now() sig, err := d.sendChunks(ctx, client, batch) + lastErr = err sendChunksFinished := time.Now() d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart)) if err == nil { @@ -200,16 +211,26 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, Signature: sig, Operator: opID, BatchHeaderHash: batchData.BatchHeaderHash, - AttestationLatencyMs: 0, // TODO: calculate latency + AttestationLatencyMs: float64(time.Since(sendChunksStart)), Err: nil, } - break } - d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "err", err) + d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", err) time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying } + + if lastErr != nil { + d.logger.Error("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", lastErr) + sigChan <- core.SigningMessage{ + Signature: nil, + Operator: opID, + BatchHeaderHash: batchData.BatchHeaderHash, + AttestationLatencyMs: 0, + Err: lastErr, + } + } d.metrics.reportSendChunksRetryCount(float64(i)) }) @@ -221,29 +242,36 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, // HandleSignatures receives signatures from operators, validates, and aggregates them func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error { + if batchData == nil { + return errors.New("batchData is required") + } handleSignaturesStart := time.Now() defer func() { d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart)) }() + batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:]) quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan) if err != nil { - return fmt.Errorf("failed to receive and validate signatures: %w", err) + return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err) } receiveSignaturesFinished := time.Now() d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart)) - quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults)) - i := 0 + nonZeroQuorums := make([]core.QuorumID, 0) for quorumID := range quorumAttestation.QuorumResults { - quorums[i] = quorumID - i++ + d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID]) + nonZeroQuorums = append(nonZeroQuorums, quorumID) } - aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums) + if len(nonZeroQuorums) == 0 { + return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash) + } + + aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonZeroQuorums) aggregateSignaturesFinished := time.Now() d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished)) if err != nil { - return fmt.Errorf("failed to aggregate signatures: %w", err) + return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err) } err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{ @@ -253,19 +281,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, APKG2: aggSig.AggPubKey, QuorumAPKs: aggSig.QuorumAggPubKeys, Sigma: aggSig.AggSignature, - QuorumNumbers: quorums, + QuorumNumbers: nonZeroQuorums, }) putAttestationFinished := time.Now() d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished)) if err != nil { - return fmt.Errorf("failed to put attestation: %w", err) + return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err) } err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified) updateBatchStatusFinished := time.Now() d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished)) if err != nil { - return fmt.Errorf("failed to mark blobs as certified: %w", err) + return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err) } return nil @@ -286,16 +314,16 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) return nil, fmt.Errorf("failed to get blob metadata by status: %w", err) } - d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas)) if len(blobMetadatas) == 0 { return nil, errNoBlobsToDispatch } + d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas), "referenceBlockNumber", referenceBlockNumber) state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber) getOperatorStateFinished := time.Now() d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished)) if err != nil { - return nil, fmt.Errorf("failed to get operator state: %w", err) + return nil, fmt.Errorf("failed to get operator state at block %d: %w", referenceBlockNumber, err) } keys := make([]corev2.BlobKey, len(blobMetadatas)) diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index bc6a0c9355..1844b48daf 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -128,7 +128,7 @@ func (e *EncodingManager) Start(ctx context.Context) error { err := e.HandleBatch(ctx) if err != nil { if errors.Is(err, errNoBlobsToEncode) { - e.logger.Warn("no blobs to encode") + e.logger.Debug("no blobs to encode") } else { e.logger.Error("failed to process a batch", "err", err) } diff --git a/node/grpc/server_v2.go b/node/grpc/server_v2.go index 620267cd02..9e7f3508f4 100644 --- a/node/grpc/server_v2.go +++ b/node/grpc/server_v2.go @@ -4,6 +4,9 @@ import ( "context" "encoding/hex" "fmt" + "runtime" + "time" + "github.com/Layr-Labs/eigenda/api" pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2" "github.com/Layr-Labs/eigenda/common" @@ -14,8 +17,6 @@ import ( "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/shirou/gopsutil/mem" - "runtime" - "time" ) // ServerV2 implements the Node v2 proto APIs. @@ -76,6 +77,12 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) ( if s.node.StoreV2 == nil { return nil, api.NewErrorInternal("v2 store not initialized") } + + // TODO(ian-shim): support remote signer + if s.node.KeyPair == nil { + return nil, api.NewErrorInternal("missing key pair") + } + batch, err := s.validateStoreChunksRequest(in) if err != nil { return nil, err