Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(da): improve logging #871

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,6 @@ func (m *Manager) syncBlockManager() error {
// UpdateSyncParams updates the sync target and state index if necessary
func (m *Manager) UpdateSyncParams(endHeight uint64) {
types.RollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
m.logger.Info("SyncTarget updated", "syncTarget", endHeight)
m.SyncTarget.Store(endHeight)
}
14 changes: 7 additions & 7 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

// ProduceBlockLoop is calling publishBlock in a loop as long as we're synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
m.logger.Debug("Started produce loop")
m.logger.Info("Started block producer loop.")

ticker := time.NewTicker(m.Conf.BlockTime)
defer ticker.Stop()
Expand All @@ -37,18 +37,18 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {

block, commit, err := m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
if errors.Is(err, context.Canceled) {
m.logger.Error("produce and gossip: context canceled", "error", err)
m.logger.Error("Produce and gossip: context canceled.", "error", err)
return
}
if errors.Is(err, types.ErrSkippedEmptyBlock) {
continue
}
if errors.Is(err, ErrNonRecoverable) {
m.logger.Error("produce and gossip: non-recoverable", "error", err) // TODO: flush? or don't log at all?
m.logger.Error("Produce and gossip: non-recoverable.", "error", err) // TODO: flush? or don't log at all?
panic(fmt.Errorf("produce and gossip block: %w", err))
}
if err != nil {
m.logger.Error("produce and gossip: uncategorized, assuming recoverable", "error", err)
m.logger.Error("Produce and gossip: uncategorized, assuming recoverable.", "error", err)
continue
}

Expand All @@ -59,7 +59,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
if 0 < len(block.Data.Txs) {
nextEmptyBlock = time.Now().Add(m.Conf.MaxProofTime)
} else {
m.logger.Info("produced empty block")
m.logger.Info("Produced empty block.")
}

// Send the size to the accumulated size channel
Expand Down Expand Up @@ -122,7 +122,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er
if err != nil {
return nil, nil, fmt.Errorf("load commit after load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable)
}
m.logger.Info("using pending block", "height", newHeight)
m.logger.Info("Using pending block.", "height", newHeight)
} else if !errors.Is(err, store.ErrKeyNotFound) {
return nil, nil, fmt.Errorf("load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable)
} else {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er
return nil, nil, fmt.Errorf("apply block: %w: %w", err, ErrNonRecoverable)
}

m.logger.Info("block created", "height", newHeight, "num_tx", len(block.Data.Txs))
m.logger.Info("Block created.", "height", newHeight, "num_tx", len(block.Data.Txs))
types.RollappBlockSizeBytesGauge.Set(float64(len(block.Data.Txs)))
types.RollappBlockSizeTxsGauge.Set(float64(len(block.Data.Txs)))
types.RollappHeightGauge.Set(float64(newHeight))
Expand Down
3 changes: 2 additions & 1 deletion block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *Manager) HandleSubmissionTrigger() error {
}

resultSubmitToDA := m.DAClient.SubmitBatch(nextBatch)

m.logger.Info("Submitted batch to DA", "start height", nextBatch.StartHeight, "end height", nextBatch.EndHeight)
if resultSubmitToDA.Code != da.StatusSuccess {
return fmt.Errorf("submit next batch to da: %s", resultSubmitToDA.Message)
}
Expand All @@ -135,6 +135,7 @@ func (m *Manager) HandleSubmissionTrigger() error {
if err != nil {
return fmt.Errorf("sl client submit batch: start height: %d: inclusive end height: %d: %w", startHeight, actualEndHeight, err)
}
m.logger.Info("Submitted batch to SL.", "start height", resultSubmitToDA, "end height", nextBatch.EndHeight)

m.UpdateSyncParams(actualEndHeight)
return nil
Expand Down
50 changes: 24 additions & 26 deletions da/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ func createConfig(bz []byte) (c Config, err error) {

// Start prepares DataAvailabilityLayerClient to work.
func (c *DataAvailabilityLayerClient) Start() (err error) {
c.logger.Info("starting Celestia Data Availability Layer Client")
c.logger.Info("Starting Celestia Data Availability Layer Client.")

// other client has already been set
if c.rpc != nil {
c.logger.Debug("celestia-node client already set")
c.logger.Info("Celestia-node client already set.")
return nil
}

Expand All @@ -144,7 +144,7 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {
}

if !state.Finished() {
c.logger.Info("waiting for celestia-node to finish syncing", "height", state.Height, "target", state.ToHeight)
c.logger.Info("Waiting for celestia-node to finish syncing.", "height", state.Height, "target", state.ToHeight)

done := make(chan error, 1)
go func() {
Expand All @@ -166,20 +166,20 @@ func (c *DataAvailabilityLayerClient) Start() (err error) {
if err != nil {
return err
}
c.logger.Info("celestia-node still syncing", "height", state.Height, "target", state.ToHeight)
c.logger.Info("Celestia-node still syncing.", "height", state.Height, "target", state.ToHeight)
}
}
}

c.logger.Info("celestia-node is synced", "height", state.ToHeight)
c.logger.Info("Celestia-node is synced.", "height", state.ToHeight)

c.rpc = NewOpenRPC(rpc)
return nil
}

// Stop stops DataAvailabilityLayerClient.
func (c *DataAvailabilityLayerClient) Stop() error {
c.logger.Info("stopping Celestia Data Availability Layer Client")
c.logger.Info("Stopping Celestia Data Availability Layer Client.")
err := c.pubsubServer.Stop()
if err != nil {
return err
Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
// TODO(srene): Split batch in multiple blobs if necessary if supported
height, commitment, err := c.submit(data)
if err != nil {
c.logger.Error("submit batch", "error", err)
c.logger.Error("Submit blob.", "error", err)
backoff.Sleep()
continue
}
Expand All @@ -240,19 +240,19 @@ func (c *DataAvailabilityLayerClient) SubmitBatch(batch *types.Batch) da.ResultS
Namespace: c.config.NamespaceID.Bytes(),
}

c.logger.Info("submitted DA batch")
c.logger.Debug("Submitted blob to DA successfully.")

result := c.CheckBatchAvailability(daMetaData)
if result.Code != da.StatusSuccess {
c.logger.Error("check batch availability: submitted batch but did not get availability success status", "error", err)
c.logger.Error("Check batch availability: submitted batch but did not get availability success status.", "error", err)
backoff.Sleep()
continue
}
daMetaData.Root = result.CheckMetaData.Root
daMetaData.Index = result.CheckMetaData.Index
daMetaData.Length = result.CheckMetaData.Length

c.logger.Debug("Batch accepted")
c.logger.Debug("Blob availability check passed successfully.")

return da.ResultSubmitBatch{
BaseResult: da.BaseResult{
Expand All @@ -269,7 +269,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
for {
select {
case <-c.ctx.Done():
c.logger.Debug("Context cancelled")
c.logger.Debug("Context cancelled.")
return da.ResultRetrieveBatch{}
default:
// Just for backward compatibility, in case no commitments are sent from the Hub, batch can be retrieved using previous implementation.
Expand All @@ -285,7 +285,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
resultRetrieveBatch = result

if errors.Is(result.Error, da.ErrRetrieval) {
c.logger.Error("retrieve batch", "error", result.Error)
c.logger.Error("Retrieve batch.", "error", result.Error)
return result.Error
}

Expand All @@ -296,7 +296,7 @@ func (c *DataAvailabilityLayerClient) RetrieveBatches(daMetaData *da.DASubmitMet
retry.Delay(c.config.RetryDelay),
)
if err != nil {
c.logger.Error("RetrieveBatches process failed", "error", err)
c.logger.Error("RetrieveBatches process failed.", "error", err)
}
return resultRetrieveBatch

Expand All @@ -308,7 +308,7 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
defer cancel()

c.logger.Debug("Celestia DA getting blob", "height", daMetaData.Height, "namespace", hex.EncodeToString(daMetaData.Namespace), "commitment", hex.EncodeToString(daMetaData.Commitment))
c.logger.Debug("Getting blob from DA.", "height", daMetaData.Height, "namespace", hex.EncodeToString(daMetaData.Namespace), "commitment", hex.EncodeToString(daMetaData.Commitment))
var batches []*types.Batch
blob, err := c.rpc.Get(ctx, daMetaData.Height, daMetaData.Namespace, daMetaData.Commitment)
if err != nil {
Expand All @@ -333,10 +333,10 @@ func (c *DataAvailabilityLayerClient) retrieveBatches(daMetaData *da.DASubmitMet
var batch pb.Batch
err = proto.Unmarshal(blob.Data, &batch)
if err != nil {
c.logger.Error("unmarshal block", "daHeight", daMetaData.Height, "error", err)
c.logger.Error("Unmarshal block.", "daHeight", daMetaData.Height, "error", err)
}

c.logger.Debug("Celestia DA get blob successful", "DA height", daMetaData.Height, "lastBlockHeight", batch.EndHeight)
c.logger.Debug("Blob retrieved successfully from DA.", "DA height", daMetaData.Height, "lastBlockHeight", batch.EndHeight)

parsedBatch := new(types.Batch)
err = parsedBatch.FromProto(&batch)
Expand Down Expand Up @@ -378,7 +378,7 @@ func (c *DataAvailabilityLayerClient) retrieveBatchesNoCommitment(dataLayerHeigh
var batch pb.Batch
err = proto.Unmarshal(blob.Data, &batch)
if err != nil {
c.logger.Error("unmarshal block", "daHeight", dataLayerHeight, "position", i, "error", err)
c.logger.Error("Unmarshal block.", "daHeight", dataLayerHeight, "position", i, "error", err)
continue
}
parsedBatch := new(types.Batch)
Expand Down Expand Up @@ -415,14 +415,14 @@ func (c *DataAvailabilityLayerClient) CheckBatchAvailability(daMetaData *da.DASu
availabilityResult = result

if result.Code != da.StatusSuccess {
c.logger.Error("Blob submitted not found in DA. Retrying availability check")
c.logger.Error("Blob submitted not found in DA. Retrying availability check.")
return da.ErrBlobNotFound
}

return nil
}, retry.Attempts(uint(c.config.RetryAttempts)), retry.DelayType(retry.FixedDelay), retry.Delay(c.config.RetryDelay))
if err != nil {
c.logger.Error("CheckAvailability process failed", "error", err)
c.logger.Error("CheckAvailability process failed.", "error", err)
}
return availabilityResult
}
Expand Down Expand Up @@ -575,17 +575,15 @@ func (c *DataAvailabilityLayerClient) submit(daBlob da.Blob) (uint64, da.Commitm
return 0, nil, fmt.Errorf("do rpc submit: %w", err)
}

c.logger.Info("Successfully submitted blobs to Celestia", "height", height)

return height, commitments[0], nil
}

func (c *DataAvailabilityLayerClient) getProof(daMetadata *da.DASubmitMetaData) (*blob.Proof, error) {
c.logger.Info("Getting proof via RPC call")
func (c *DataAvailabilityLayerClient) getProof(daMetaData *da.DASubmitMetaData) (*blob.Proof, error) {
c.logger.Debug("Getting proof via RPC call.", "height", daMetaData.Height, "namespace", daMetaData.Namespace, "commitment", daMetaData.Commitment)
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
defer cancel()

proof, err := c.rpc.GetProof(ctx, daMetadata.Height, daMetadata.Namespace, daMetadata.Commitment)
proof, err := c.rpc.GetProof(ctx, daMetaData.Height, daMetaData.Namespace, daMetaData.Commitment)
if err != nil {
return nil, err
}
Expand All @@ -612,15 +610,15 @@ func (c *DataAvailabilityLayerClient) blobsAndCommitments(daBlob da.Blob) ([]*bl
}

func (c *DataAvailabilityLayerClient) validateProof(daMetaData *da.DASubmitMetaData, proof *blob.Proof) (bool, error) {
c.logger.Info("Getting inclusion validation via RPC call")
c.logger.Debug("Validating proof via RPC call.", "height", daMetaData.Height, "namespace", daMetaData.Namespace, "commitment", daMetaData.Commitment)
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
defer cancel()

return c.rpc.Included(ctx, daMetaData.Height, daMetaData.Namespace, proof, daMetaData.Commitment)
}

func (c *DataAvailabilityLayerClient) getDataAvailabilityHeaders(height uint64) (*header.DataAvailabilityHeader, error) {
c.logger.Info("Getting Celestia extended headers via RPC call")
c.logger.Debug("Getting extended headers via RPC call.", "height", height)
ctx, cancel := context.WithTimeout(c.ctx, c.config.Timeout)
defer cancel()

Expand Down
4 changes: 2 additions & 2 deletions settlement/dymension/dymension.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *
return fmt.Errorf("subscription cancelled")

case <-subscription.Out():
d.logger.Debug("Batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
d.logger.Info("batch accepted", "startHeight", batch.StartHeight, "endHeight", batch.EndHeight)
return nil

case <-timer.C:
Expand All @@ -282,7 +282,7 @@ func (d *HubClient) PostBatch(batch *types.Batch, daClient da.Client, daResult *
}

// all good
d.logger.Info("Batch accepted", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight)
d.logger.Info("batch accepted", "startHeight", includedBatch.StartHeight, "endHeight", includedBatch.EndHeight, "index", includedBatch.StateIndex)
return nil
}
}
Expand Down
Loading