Skip to content

Commit

Permalink
Add logging for batch header hash for batcher-node (#545)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored May 8, 2024
1 parent d2712a8 commit 83ef8bd
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 39 deletions.
17 changes: 9 additions & 8 deletions core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ var (
ErrAggSigNotValid = errors.New("aggregated signature is not valid")
)

type SignerMessage struct {
Signature *Signature
Operator OperatorID
Err error
type SigningMessage struct {
Signature *Signature
Operator OperatorID
BatchHeaderHash [32]byte
Err error
}

// SignatureAggregation contains the results of aggregating signatures from a set of operators
Expand Down Expand Up @@ -53,7 +54,7 @@ type SignatureAggregator interface {

// AggregateSignatures blocks until it receives a response for each operator in the operator state via messageChan, and then returns the aggregated signature.
// If the aggregated signature is invalid, an error is returned.
AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SignerMessage) (*SignatureAggregation, error)
AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SigningMessage) (*SignatureAggregation, error)
}

type StdSignatureAggregator struct {
Expand All @@ -78,7 +79,7 @@ func NewStdSignatureAggregator(logger logging.Logger, transactor Transactor) (*S

var _ SignatureAggregator = (*StdSignatureAggregator)(nil)

func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SignerMessage) (*SignatureAggregation, error) {
func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state *IndexedOperatorState, quorumIDs []QuorumID, message [32]byte, messageChan chan SigningMessage) (*SignatureAggregation, error) {
// TODO: Add logging

if len(quorumIDs) == 0 {
Expand Down Expand Up @@ -127,7 +128,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
socket = op.Socket
}
if r.Err != nil {
a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "err", r.Err)
a.Logger.Warn("error returned from messageChan", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "batchHeaderHash", r.BatchHeaderHash, "err", r.Err)
continue
}

Expand Down Expand Up @@ -170,7 +171,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
aggPubKeys[ind].Add(op.PubkeyG2)
}
}
a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums))
a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums), "batchHeaderHash", r.BatchHeaderHash)
}

// Aggregate Non signer Pubkey Id
Expand Down
10 changes: 5 additions & 5 deletions core/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestMain(m *testing.M) {
os.Exit(code)
}

func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update chan core.SignerMessage, advCount uint) {
func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update chan core.SigningMessage, advCount uint) {

count := 0

Expand All @@ -54,13 +54,13 @@ func simulateOperators(state mock.PrivateOperatorState, message [32]byte, update
op := state.PrivateOperators[id]
sig := op.KeyPair.SignMessage(message)
if count < len(state.IndexedOperators)-int(advCount) {
update <- core.SignerMessage{
update <- core.SigningMessage{
Signature: sig,
Operator: id,
Err: nil,
}
} else {
update <- core.SignerMessage{
update <- core.SigningMessage{
Signature: nil,
Operator: id,
Err: errors.New("adversary"),
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestAggregateSignaturesStatus(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
state := dat.GetTotalOperatorStateWithQuorums(context.Background(), 0, []core.QuorumID{0, 1})

update := make(chan core.SignerMessage)
update := make(chan core.SigningMessage)
message := [32]byte{1, 2, 3, 4, 5, 6}

go simulateOperators(*state, message, update, tt.adversaryCount)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestSortNonsigners(t *testing.T) {

state := dat.GetTotalOperatorState(context.Background(), 0)

update := make(chan core.SignerMessage)
update := make(chan core.SigningMessage)
message := [32]byte{1, 2, 3, 4, 5, 6}

go simulateOperators(*state, message, update, 4)
Expand Down
37 changes: 22 additions & 15 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,24 @@ func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.Dispatch

var _ disperser.Dispatcher = (*dispatcher)(nil)

func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SignerMessage {
update := make(chan core.SignerMessage, len(state.IndexedOperators))
func (c *dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader) chan core.SigningMessage {
update := make(chan core.SigningMessage, len(state.IndexedOperators))

// Disperse
c.sendAllChunks(ctx, state, blobs, batchHeader, update)

return update
}

func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SignerMessage) {
func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, batchHeader *core.BatchHeader, update chan core.SigningMessage) {
for id, op := range state.IndexedOperators {
go func(op core.IndexedOperatorInfo, id core.OperatorID) {
blobMessages := make([]*core.BlobMessage, 0)
hasAnyBundles := false
batchHeaderHash, err := batchHeader.GetBatchHeaderHash()
if err != nil {
return
}
for _, blob := range blobs {
if _, ok := blob.BundlesByOperator[id]; ok {
hasAnyBundles = true
Expand All @@ -63,28 +67,31 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera
}
if !hasAnyBundles {
// Operator is not part of any quorum, no need to send chunks
update <- core.SignerMessage{
Err: errors.New("operator is not part of any quorum"),
Signature: nil,
Operator: id,
update <- core.SigningMessage{
Err: errors.New("operator is not part of any quorum"),
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
}
return
}

requestedAt := time.Now()
sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op)
if err != nil {
update <- core.SignerMessage{
Err: err,
Signature: nil,
Operator: id,
update <- core.SigningMessage{
Err: err,
Signature: nil,
Operator: id,
BatchHeaderHash: batchHeaderHash,
}
c.metrics.ObserveLatency(false, float64(time.Since(requestedAt).Milliseconds()))
} else {
update <- core.SignerMessage{
Signature: sig,
Operator: id,
Err: nil,
update <- core.SigningMessage{
Signature: sig,
Operator: id,
BatchHeaderHash: batchHeaderHash,
Err: nil,
}
c.metrics.ObserveLatency(true, float64(time.Since(requestedAt).Milliseconds()))
}
Expand Down
2 changes: 1 addition & 1 deletion disperser/disperser.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type BlobStore interface {
}

type Dispatcher interface {
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SignerMessage
DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SigningMessage
}

// GenerateReverseIndexKey returns the key used to store the blob key in the reverse index
Expand Down
8 changes: 4 additions & 4 deletions disperser/mock/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ func NewDispatcher(state *mock.PrivateOperatorState) disperser.Dispatcher {
}
}

func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SignerMessage {
update := make(chan core.SignerMessage)
func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOperatorState, blobs []core.EncodedBlob, header *core.BatchHeader) chan core.SigningMessage {
update := make(chan core.SigningMessage)
message, err := header.GetBatchHeaderHash()
if err != nil {
for id := range d.state.PrivateOperators {
update <- core.SignerMessage{
update <- core.SigningMessage{
Signature: nil,
Operator: id,
Err: err,
Expand All @@ -37,7 +37,7 @@ func (d *Dispatcher) DisperseBatch(ctx context.Context, state *core.IndexedOpera
for id, op := range d.state.PrivateOperators {
sig := op.KeyPair.SignMessage(message)

update <- core.SignerMessage{
update <- core.SigningMessage{
Signature: sig,
Operator: id,
Err: nil,
Expand Down
12 changes: 6 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,13 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
start := time.Now()
log := n.Logger

log.Debug("Processing batch", "num of blobs", len(blobs))
batchHeaderHash, err := header.GetBatchHeaderHash()
if err != nil {
return nil, err
}

if len(blobs) == 0 {
return nil, errors.New("ProcessBatch: number of blobs must be greater than zero")
return nil, errors.New("number of blobs must be greater than zero")
}

if len(blobs) != len(rawBlobs) {
Expand All @@ -301,10 +304,7 @@ func (n *Node) ProcessBatch(ctx context.Context, header *core.BatchHeader, blobs
}
n.Metrics.AcceptBatches("received", batchSize)

batchHeaderHash, err := header.GetBatchHeaderHash()
if err != nil {
return nil, err
}
log.Debug("Start processing a batch", "batchHeaderHash", batchHeaderHash, "batchSize (in bytes)", batchSize, "num of blobs", len(blobs), "referenceBlockNumber", header.ReferenceBlockNumber)

// Store the batch.
// Run this in a goroutine so we can parallelize the batch storing and batch
Expand Down

0 comments on commit 83ef8bd

Please sign in to comment.