Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: return attestation err result to dispatcher #1011

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In
// Aggregate the aggregated signatures. We reuse the first aggregated signature as the accumulator
var aggSig *Signature
for _, quorumID := range quorumIDs {
if quorumAttestation.AggSignature[quorumID] == nil {
a.Logger.Error("cannot aggregate signature for quorum because aggregated signature is nil", "quorumID", quorumID)
continue
}
sig := quorumAttestation.AggSignature[quorumID]
if aggSig == nil {
aggSig = &Signature{sig.G1Point.Clone()}
Expand All @@ -284,6 +288,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In
// Aggregate the aggregated public keys. We reuse the first aggregated public key as the accumulator
var aggPubKey *G2Point
for _, quorumID := range quorumIDs {
if quorumAttestation.SignersAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because signers aggregated public key is nil", "quorumID", quorumID)
continue
}
apk := quorumAttestation.SignersAggPubKey[quorumID]
if aggPubKey == nil {
aggPubKey = apk.Clone()
Expand Down Expand Up @@ -315,6 +323,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In

quorumAggKeys := make(map[QuorumID]*G1Point, len(quorumIDs))
for _, quorumID := range quorumIDs {
if quorumAttestation.QuorumAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because aggregated public key is nil", "quorumID", quorumID)
continue
}
quorumAggKeys[quorumID] = quorumAttestation.QuorumAggPubKey[quorumID]
}

Expand Down
16 changes: 9 additions & 7 deletions core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,22 +314,24 @@ type Attestation struct {
QuorumNumbers []core.QuorumID
}

func (a *Attestation) ToProtobuf() *disperserpb.Attestation {
func (a *Attestation) ToProtobuf() (*disperserpb.Attestation, error) {
nonSignerPubKeys := make([][]byte, len(a.NonSignerPubKeys))
for i, p := range a.NonSignerPubKeys {
pubkeyBytes := p.Bytes()
nonSignerPubKeys[i] = pubkeyBytes[:]
}

quorumAPKs := make([][]byte, len(a.QuorumAPKs))
for i, p := range a.QuorumAPKs {
apkBytes := p.Bytes()
quorumAPKs[i] = apkBytes[:]
}

quorumNumbers := make([]uint32, len(a.QuorumNumbers))
for i, q := range a.QuorumNumbers {
quorumNumbers[i] = uint32(q)

apk, ok := a.QuorumAPKs[q]
if !ok {
return nil, fmt.Errorf("missing quorum APK for quorum %d", q)
}
apkBytes := apk.Bytes()
quorumAPKs[i] = apkBytes[:]
}

apkG2Bytes := a.APKG2.Bytes()
Expand All @@ -341,7 +343,7 @@ func (a *Attestation) ToProtobuf() *disperserpb.Attestation {
QuorumApks: quorumAPKs,
Sigma: sigmaBytes[:],
QuorumNumbers: quorumNumbers,
}
}, nil
}

type BlobVerificationInfo struct {
Expand Down
8 changes: 7 additions & 1 deletion disperser/apiserver/get_blob_status_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,18 @@ func (s *DispersalServerV2) GetBlobStatus(ctx context.Context, req *pb.BlobStatu
continue
}

attestationProto, err := attestation.ToProtobuf()
if err != nil {
s.logger.Error("failed to convert attestation to protobuf", "err", err, "blobKey", blobKey.Hex())
continue
}

// return the first signed batch found
return &pb.BlobStatusReply{
Status: metadata.BlobStatus.ToProfobuf(),
SignedBatch: &pb.SignedBatch{
Header: batchHeader.ToProtobuf(),
Attestation: attestation.ToProtobuf(),
Attestation: attestationProto,
},
BlobVerificationInfo: blobVerificationInfoProto,
}, nil
Expand Down
3 changes: 2 additions & 1 deletion disperser/apiserver/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,8 @@ func TestV2GetBlobStatus(t *testing.T) {
require.Equal(t, verificationInfo0.InclusionProof, reply.GetBlobVerificationInfo().GetInclusionProof())
require.Equal(t, batchHeader.BatchRoot[:], reply.GetSignedBatch().GetHeader().BatchRoot)
require.Equal(t, batchHeader.ReferenceBlockNumber, reply.GetSignedBatch().GetHeader().ReferenceBlockNumber)
attestationProto := attestation.ToProtobuf()
attestationProto, err := attestation.ToProtobuf()
require.NoError(t, err)
require.Equal(t, attestationProto, reply.GetSignedBatch().GetAttestation())
}

Expand Down
61 changes: 45 additions & 16 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
sigChan, batchData, err := d.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToDispatch) {
d.logger.Warn("no blobs to dispatch")
d.logger.Debug("no blobs to dispatch")
} else {
d.logger.Error("failed to process a batch", "err", err)
}
Expand All @@ -110,6 +111,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
if err != nil {
d.logger.Error("failed to handle signatures", "err", err)
}
close(sigChan)
// TODO(ian-shim): handle errors and mark failed
}()
}
Expand Down Expand Up @@ -172,15 +174,24 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
err := d.blobMetadataStore.PutDispersalRequest(ctx, req)
if err != nil {
d.logger.Error("failed to put dispersal request", "err", err)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: err,
}
return
}

d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart))

var i int
var lastErr error
for i = 0; i < d.NumRequestRetries+1; i++ {
sendChunksStart := time.Now()
sig, err := d.sendChunks(ctx, client, batch)
lastErr = err
sendChunksFinished := time.Now()
d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart))
if err == nil {
Expand All @@ -200,16 +211,26 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
Signature: sig,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0, // TODO: calculate latency
AttestationLatencyMs: float64(time.Since(sendChunksStart)),
Err: nil,
}

break
}

d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "err", err)
d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
}

if lastErr != nil {
d.logger.Error("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", lastErr)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: lastErr,
}
}
d.metrics.reportSendChunksRetryCount(float64(i))
})

Expand All @@ -221,29 +242,36 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,

// HandleSignatures receives signatures from operators, validates, and aggregates them
func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error {
if batchData == nil {
return errors.New("batchData is required")
}
handleSignaturesStart := time.Now()
defer func() {
d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart))
}()

batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:])
quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
return fmt.Errorf("failed to receive and validate signatures: %w", err)
return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err)
}
receiveSignaturesFinished := time.Now()
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults))
i := 0
nonZeroQuorums := make([]core.QuorumID, 0)
for quorumID := range quorumAttestation.QuorumResults {
quorums[i] = quorumID
i++
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID])
nonZeroQuorums = append(nonZeroQuorums, quorumID)
}
aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums)
if len(nonZeroQuorums) == 0 {
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
}

aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonZeroQuorums)
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to aggregate signatures: %w", err)
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
Expand All @@ -253,21 +281,22 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
APKG2: aggSig.AggPubKey,
QuorumAPKs: aggSig.QuorumAggPubKeys,
Sigma: aggSig.AggSignature,
QuorumNumbers: quorums,
QuorumNumbers: nonZeroQuorums,
})
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation: %w", err)
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified: %w", err)
return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
}

d.logger.Debug("successfully processed batch", "batchHeader", batchHeaderHash)
return nil
}

Expand All @@ -286,16 +315,16 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}

d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas))
if len(blobMetadatas) == 0 {
return nil, errNoBlobsToDispatch
}
d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas), "referenceBlockNumber", referenceBlockNumber)

state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber)
getOperatorStateFinished := time.Now()
d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished))
if err != nil {
return nil, fmt.Errorf("failed to get operator state: %w", err)
return nil, fmt.Errorf("failed to get operator state at block %d: %w", referenceBlockNumber, err)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
Expand Down
4 changes: 3 additions & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *EncodingManager) Start(ctx context.Context) error {
err := e.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToEncode) {
e.logger.Warn("no blobs to encode")
e.logger.Debug("no blobs to encode")
} else {
e.logger.Error("failed to process a batch", "err", err)
}
Expand Down Expand Up @@ -263,6 +263,8 @@ func (e *EncodingManager) HandleBatch(ctx context.Context) error {
if cursor != nil {
e.cursor = cursor
}

e.logger.Debug("successfully submitted encoding requests", "numBlobs", len(blobMetadatas))
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"encoding/hex"
"fmt"
"runtime"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
Expand All @@ -14,8 +17,6 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/shirou/gopsutil/mem"
"runtime"
"time"
)

// ServerV2 implements the Node v2 proto APIs.
Expand Down Expand Up @@ -76,6 +77,12 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
if s.node.StoreV2 == nil {
return nil, api.NewErrorInternal("v2 store not initialized")
}

// TODO(ian-shim): support remote signer
if s.node.KeyPair == nil {
return nil, api.NewErrorInternal("missing key pair")
}

batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
Expand Down
Loading