Skip to content

Commit

Permalink
renames and reshuffles
Browse files Browse the repository at this point in the history
  • Loading branch information
danwt committed May 13, 2024
1 parent c4db6ff commit 6b3f56b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 36 deletions.
57 changes: 30 additions & 27 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (

// Manager is responsible for aggregating transactions into blocks.
type Manager struct {
logger types.Logger

// Configuration
Conf config.BlockManagerConfig
Genesis *tmtypes.GenesisDoc
Expand All @@ -49,26 +51,27 @@ type Manager struct {
DAClient da.DataAvailabilityLayerClient
SLClient settlement.LayerI

// Data retrieval
Retriever da.BatchRetriever
SyncTargetDiode diodes.Diode

// Block production
producedSizeCh chan uint64 // channel for the producer to report the size of the block it produced
/*
Production
*/
producedSizeCh chan uint64 // for the producer to report the size of the block it produced

// Submitter
/*
Submission
*/
AccumulatedBatchSize atomic.Uint64
// The last height which was submitted to both sublayers
lastSubmittedHeight uint64

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
Retrieval
*/
Retriever da.BatchRetriever
// get the next target height to sync local state to
targetSyncHeight diodes.Diode
// Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
// and incoming DA blocks, respectively.
retrieverMutex sync.Mutex

logger types.Logger

// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache map[uint64]CachedBlock
Expand Down Expand Up @@ -104,21 +107,21 @@ func NewManager(
}

agg := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
LastState: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
SyncTargetDiode: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
LastState: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
}

return agg, nil
Expand Down
4 changes: 2 additions & 2 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ import (
// from the DA.
func (m *Manager) RetrieveLoop(ctx context.Context) {
m.logger.Info("started retrieve loop")
syncTargetPoller := diodes.NewPoller(m.SyncTargetDiode, diodes.WithPollingContext(ctx))
targetSyncHeightPoller := diodes.NewPoller(m.targetSyncHeight, diodes.WithPollingContext(ctx))

for {
select {
case <-ctx.Done():
return
default:
// Get only the latest sync target
targetHeight := syncTargetPoller.Next()
targetHeight := targetSyncHeightPoller.Next()
err := m.syncToTargetHeight(*(*uint64)(targetHeight))
if err != nil {
panic(fmt.Errorf("sync until target: %w", err))
Expand Down
15 changes: 8 additions & 7 deletions block/synctarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,21 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
return
case event := <-subscription.Out():
eventData := event.Data().(*settlement.EventDataNewBatchAccepted)
h := eventData.EndHeight

if eventData.EndHeight <= m.Store.Height() {
if h <= m.Store.Height() {
m.logger.Debug(
"syncTargetLoop: received new settlement batch accepted with batch end height <= current store height, skipping.",
"height",
eventData.EndHeight,
"currentHeight",
"target sync height (batch end height)",
h,
"current store height",
m.Store.Height(),
)
continue
}
m.SyncTargetDiode.Set(diodes.GenericDataType(&eventData.EndHeight))
m.logger.Info("Received new sync target height", "height", eventData.EndHeight)
types.RollappHubHeightGauge.Set(float64(eventData.EndHeight)) // TODO(danwt): needed?
m.targetSyncHeight.Set(diodes.GenericDataType(&h))
m.logger.Info("Set new target sync height", "height", h)
types.RollappHubHeightGauge.Set(float64(h)) // TODO(danwt): needed?
case <-subscription.Cancelled():
m.logger.Info("syncTargetLoop subscription canceled")
return
Expand Down

0 comments on commit 6b3f56b

Please sign in to comment.