From 83ef8bdd5eb38426df44b6ab258f9f4e96bf8369 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Wed, 8 May 2024 15:53:35 -0700 Subject: [PATCH] Add logging for batch header hash for batcher-node (#545) --- core/aggregation.go | 17 +++++++------ core/aggregation_test.go | 10 ++++---- disperser/batcher/grpc/dispatcher.go | 37 +++++++++++++++++----------- disperser/disperser.go | 2 +- disperser/mock/dispatcher.go | 8 +++--- node/node.go | 12 ++++----- 6 files changed, 47 insertions(+), 39 deletions(-) diff --git a/core/aggregation.go b/core/aggregation.go index 3e156d24ff..2a023e9bb7 100644 --- a/core/aggregation.go +++ b/core/aggregation.go @@ -22,10 +22,11 @@ var ( ErrAggSigNotValid = errors.New("aggregated signature is not valid") ) -type SignerMessage struct { - Signature *Signature - Operator OperatorID - Err error +type SigningMessage struct { + Signature *Signature + Operator OperatorID + BatchHeaderHash [32]byte + Err error } // SignatureAggregation contains the results of aggregating signatures from a set of operators @@ -53,7 +54,7 @@ type SignatureAggregator interface { // AggregateSignatures blocks until it receives a response for each operator in the operator state via messageChan, and then returns the aggregated signature. // If the aggregated signature is invalid, an error is returned. - AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SignerMessage) (*SignatureAggregation, error) + AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SigningMessage) (*SignatureAggregation, error) } type StdSignatureAggregator struct { @@ -78,7 +79,7 @@ func NewStdSignatureAggregator(logger logging.Logger, transactor Transactor) (*S var _ SignatureAggregator = (*StdSignatureAggregator)(nil) -func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SignerMessage) (*SignatureAggregation, error) { +func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SigningMessage) (*SignatureAggregation, error) { // TODO: Add logging if len(quorumIDs) == 0 { @@ -127,7 +128,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state socket = op.Socket } if r.Err != nil { - a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "err", r.Err) + a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "batchHeaderHash", r.BatchHeaderHash, "err", r.Err) continue } @@ -170,7 +171,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state aggPubKeys[ind].Add(op.PubkeyG2) } } - a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums)) + a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums), "batchHeaderHash", r.BatchHeaderHash) } // Aggregate Non signer Pubkey Id diff --git a/core/aggregation_test.go b/core/aggregation_test.go index 20ae2e45ed..61d722cc5e 100644 --- a/core/aggregation_test.go +++ b/core/aggregation_test.go @@ -42,7 +42,7 @@ func TestMain(m *testing.M) { os.Exit(code) } -func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update chan core.SignerMessage, advCount uint) { +func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update chan core.SigningMessage, advCount uint) { count := 0 @@ -54,13 +54,13 @@ func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update op := state.PrivateOperators[id] sig := op.KeyPair.SignMessage(message) if count < len(state.IndexedOperators)-int(advCount) { - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: sig, Operator: id, Err: nil, } } else { - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: nil, Operator: id, Err: errors.New("adversary"), @@ -154,7 +154,7 @@ func TestAggregateSignaturesStatus(t *testing.T) { t.Run(tt.name, func(t *testing.T) { state := dat.GetTotalOperatorStateWithQuorums(context.Background(), 0, []core.QuorumID{0, 1}) - update := make(chan core.SignerMessage) + update := make(chan core.SigningMessage) message := [32]byte{1, 2, 3, 4, 5, 6} go simulateOperators(*state, message, update, tt.adversaryCount) @@ -183,7 +183,7 @@ func TestSortNonsigners(t *testing.T) { state := dat.GetTotalOperatorState(context.Background(), 0) - update := make(chan core.SignerMessage) + update := make(chan core.SigningMessage) message := [32]byte{1, 2, 3, 4, 5, 6} go simulateOperators(*state, message, update, 4) diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 4591c854f1..b9648bf808 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -37,8 +37,8 @@ func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.Dispatch var _ disperser.Dispatcher = (*dispatcher)(nil) -func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SignerMessage { - update := make(chan core.SignerMessage, len(state.IndexedOperators)) +func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SigningMessage { + update := make(chan core.SigningMessage, len(state.IndexedOperators)) // Disperse c.sendAllChunks(ctx, state, blobs, batchHeader, update) @@ -46,11 +46,15 @@ func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera return update } -func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SignerMessage) { +func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) { for id, op := range state.IndexedOperators { go func(op core.IndexedOperatorInfo, id core.OperatorID) { blobMessages := make([]*core.BlobMessage, 0) hasAnyBundles := false + batchHeaderHash, err := batchHeader.GetBatchHeaderHash() + if err != nil { + return + } for _, blob := range blobs { if _, ok := blob.BundlesByOperator[id]; ok { hasAnyBundles = true @@ -63,10 +67,11 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera } if !hasAnyBundles { // Operator is not part of any quorum, no need to send chunks - update <- core.SignerMessage{ - Err: errors.New("operator is not part of any quorum"), - Signature: nil, - Operator: id, + update <- core.SigningMessage{ + Err: errors.New("operator is not part of any quorum"), + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, } return } @@ -74,17 +79,19 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera requestedAt := time.Now() sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op) if err != nil { - update <- core.SignerMessage{ - Err: err, - Signature: nil, - Operator: id, + update <- core.SigningMessage{ + Err: err, + Signature: nil, + Operator: id, + BatchHeaderHash: batchHeaderHash, } c.metrics.ObserveLatency(false, float64(time.Since(requestedAt).Milliseconds())) } else { - update <- core.SignerMessage{ - Signature: sig, - Operator: id, - Err: nil, + update <- core.SigningMessage{ + Signature: sig, + Operator: id, + BatchHeaderHash: batchHeaderHash, + Err: nil, } c.metrics.ObserveLatency(true, float64(time.Since(requestedAt).Milliseconds())) } diff --git a/disperser/disperser.go b/disperser/disperser.go index 004683a888..4b16b2933c 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -174,7 +174,7 @@ type BlobStore interface { } type Dispatcher interface { - DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SignerMessage + DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage } // GenerateReverseIndexKey returns the key used to store the blob key in the reverse index diff --git a/disperser/mock/dispatcher.go b/disperser/mock/dispatcher.go index 33dd9bd59e..980bf0c9fd 100644 --- a/disperser/mock/dispatcher.go +++ b/disperser/mock/dispatcher.go @@ -20,12 +20,12 @@ func NewDispatcher(state *mock.PrivateOperatorState) disperser.Dispatcher { } } -func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SignerMessage { - update := make(chan core.SignerMessage) +func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SigningMessage { + update := make(chan core.SigningMessage) message, err := header.GetBatchHeaderHash() if err != nil { for id := range d.state.PrivateOperators { - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: nil, Operator: id, Err: err, @@ -37,7 +37,7 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera for id, op := range d.state.PrivateOperators { sig := op.KeyPair.SignMessage(message) - update <- core.SignerMessage{ + update <- core.SigningMessage{ Signature: sig, Operator: id, Err: nil, diff --git a/node/node.go b/node/node.go index 010b76cf6d..253e0b9599 100644 --- a/node/node.go +++ b/node/node.go @@ -281,10 +281,13 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs start := time.Now() log := n.Logger - log.Debug("Processing batch", "num of blobs", len(blobs)) + batchHeaderHash, err := header.GetBatchHeaderHash() + if err != nil { + return nil, err + } if len(blobs) == 0 { - return nil, errors.New("ProcessBatch: number of blobs must be greater than zero") + return nil, errors.New("number of blobs must be greater than zero") } if len(blobs) != len(rawBlobs) { @@ -301,10 +304,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs } n.Metrics.AcceptBatches("received", batchSize) - batchHeaderHash, err := header.GetBatchHeaderHash() - if err != nil { - return nil, err - } + log.Debug("Start processing a batch", "batchHeaderHash", batchHeaderHash, "batchSize (in bytes)", batchSize, "num of blobs", len(blobs), "referenceBlockNumber", header.ReferenceBlockNumber) // Store the batch. // Run this in a goroutine so we can parallelize the batch storing and batch