Skip to content

Commit

Permalink
refactor aggregator/nonaggregator loop start
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed May 15, 2024
1 parent 423878a commit 50bec52
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,31 +151,24 @@ func (m *Manager) Start(ctx context.Context) error {
}
}

//Fullnode loop can start before syncing from DA
if !isSequencer {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
m.startNonSequencerLoop(ctx)
}

// TODO: populate the accumulatedSize on startup
//Wait till DA is up and running
<-m.DAClient.Started()
//Start syncing from DA
err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}

//Aggregator producer loop is started only when finished syncing from DA
if isSequencer {
// TODO: populate the accumulatedSize on startup
//Wait till DA is up and running
<-m.DAClient.Started()
err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
m.startSequencerLoop(ctx)
}

return nil
}

Expand Down Expand Up @@ -224,15 +217,13 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
m.retrieverMutex.Lock() // needed to protect blockCache access

_, found := m.blockCache[block.Header.Height]
// It is not strictly necessary to return early, for correctness, but doing so helps us avoid mutex pressure and unnecessary repeated attempts to apply cached blocks
if found {
if found || block.Header.Height < m.Store.NextHeight() {
m.retrieverMutex.Unlock()
return
}

m.logger.Debug("Received new block via gossip", "block height", block.Header.Height, "store height", m.Store.Height(), "n cachedBlocks", len(m.blockCache))
m.logger.Debug("Received new block via gossip", "height", block.Header.Height, "n cachedBlocks", len(m.blockCache))

nextHeight := m.Store.NextHeight()
if block.Header.Height >= nextHeight {
Expand All @@ -259,3 +250,14 @@ func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc, logger type

return s, err
}

func (m *Manager) startNonSequencerLoop(ctx context.Context) {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
go m.RetrieveLoop(ctx)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
go m.SyncTargetLoop(ctx)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

func (m *Manager) startSequencerLoop(ctx context.Context) {
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
}

0 comments on commit 50bec52

Please sign in to comment.