diff --git a/block/manager.go b/block/manager.go index 0c6461a39..d80752080 100644 --- a/block/manager.go +++ b/block/manager.go @@ -33,6 +33,8 @@ import ( // Manager is responsible for aggregating transactions into blocks. type Manager struct { + logger types.Logger + // Configuration Conf config.BlockManagerConfig Genesis *tmtypes.GenesisDoc @@ -49,26 +51,27 @@ type Manager struct { DAClient da.DataAvailabilityLayerClient SLClient settlement.LayerI - // Data retrieval - Retriever da.BatchRetriever - SyncTargetDiode diodes.Diode - - // Block production - producedSizeCh chan uint64 // channel for the producer to report the size of the block it produced + /* + Production + */ + producedSizeCh chan uint64 // for the producer to report the size of the block it produced - // Submitter + /* + Submission + */ AccumulatedBatchSize atomic.Uint64 // The last height which was submitted to both sublayers lastSubmittedHeight uint64 /* - Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks, - and incoming DA blocks, respectively. + Retrieval */ + Retriever da.BatchRetriever + // get the next target height to sync local state to + targetSyncHeight diodes.Diode + // Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks, + // and incoming DA blocks, respectively. retrieverMutex sync.Mutex - - logger types.Logger - // Cached blocks and commits for applying at future heights. The blocks may not be valid, because // we can only do full validation in sequential order. blockCache map[uint64]CachedBlock @@ -104,21 +107,21 @@ func NewManager( } agg := &Manager{ - Pubsub: pubsub, - p2pClient: p2pClient, - ProposerKey: proposerKey, - Conf: conf, - Genesis: genesis, - LastState: s, - Store: store, - Executor: exec, - DAClient: dalc, - SLClient: settlementClient, - Retriever: dalc.(da.BatchRetriever), - SyncTargetDiode: diodes.NewOneToOne(1, nil), - producedSizeCh: make(chan uint64), - logger: logger, - blockCache: make(map[uint64]CachedBlock), + Pubsub: pubsub, + p2pClient: p2pClient, + ProposerKey: proposerKey, + Conf: conf, + Genesis: genesis, + LastState: s, + Store: store, + Executor: exec, + DAClient: dalc, + SLClient: settlementClient, + Retriever: dalc.(da.BatchRetriever), + targetSyncHeight: diodes.NewOneToOne(1, nil), + producedSizeCh: make(chan uint64), + logger: logger, + blockCache: make(map[uint64]CachedBlock), } return agg, nil diff --git a/block/retriever.go b/block/retriever.go index 6eebe41e4..e885e7255 100644 --- a/block/retriever.go +++ b/block/retriever.go @@ -16,7 +16,7 @@ import ( // from the DA. func (m *Manager) RetrieveLoop(ctx context.Context) { m.logger.Info("started retrieve loop") - syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx)) + targetSyncHeightPoller := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx)) for { select { @@ -24,7 +24,7 @@ func (m *Manager) RetrieveLoop(ctx context.Context) { return default: // Get only the latest sync target - targetHeight := syncTargetPoller.Next() + targetHeight := targetSyncHeightPoller.Next() err := m.syncToTargetHeight(*(*uint64)(targetHeight)) if err != nil { panic(fmt.Errorf("sync until target: %w", err)) diff --git a/block/synctarget.go b/block/synctarget.go index 229bd4682..2a174b3a1 100644 --- a/block/synctarget.go +++ b/block/synctarget.go @@ -26,20 +26,21 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) { return case event := <-subscription.Out(): eventData := event.Data().(*settlement.EventDataNewBatchAccepted) + h := eventData.EndHeight - if eventData.EndHeight <= m.Store.Height() { + if h <= m.Store.Height() { m.logger.Debug( "syncTargetLoop: received new settlement batch accepted with batch end height <= current store height, skipping.", - "height", - eventData.EndHeight, - "currentHeight", + "target sync height (batch end height)", + h, + "current store height", m.Store.Height(), ) continue } - m.SyncTargetDiode.Set(diodes.GenericDataType(&eventData.EndHeight)) - m.logger.Info("Received new sync target height", "height", eventData.EndHeight) - types.RollappHubHeightGauge.Set(float64(eventData.EndHeight)) // TODO(danwt): needed? + m.targetSyncHeight.Set(diodes.GenericDataType(&h)) + m.logger.Info("Set new target sync height", "height", h) + types.RollappHubHeightGauge.Set(float64(h)) // TODO(danwt): needed? case <-subscription.Cancelled(): m.logger.Info("syncTargetLoop subscription canceled") return