Skip to content

Commit

Permalink
refactor aggregator/nonaggregator loop start
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed May 14, 2024
1 parent f47380c commit eeb7272
Showing 1 changed file with 34 additions and 15 deletions.
49 changes: 34 additions & 15 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,24 @@ func (m *Manager) Start(ctx context.Context) error {
}
}

if isAggregator {
// TODO: populate the accumulatedSize on startup
//Wait till DA is up and running
<-m.DAClient.Started()
err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
//Fullnode loop can start before syncing from DA
if !isAggregator {
m.startNonAggregatorLoop(ctx)
}

// TODO: populate the accumulatedSize on startup
//Wait till DA is up and running
<-m.DAClient.Started()
//Start syncing from DA
err = m.syncBlockManager()
if err != nil {
return fmt.Errorf("sync block manager: %w", err)
}

//Aggregator producer loop is started only when finished syncing from DA
if isAggregator {
m.startAggregatorLoop(ctx)
}
return nil
}

Expand Down Expand Up @@ -216,6 +218,12 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit

_, found := m.blockCache[block.Header.Height]
if found || block.Header.Height < m.Store.NextHeight() {
m.retrieverMutex.Unlock()
return
}
m.logger.Info("Received new block via gossip", "height", block.Header.Height, "n cachedBlocks", len(m.blockCache))

nextHeight := m.Store.NextHeight()
Expand All @@ -224,7 +232,7 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
Block: &block,
Commit: &commit,
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
m.logger.Info("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
}
m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant
err := m.attemptApplyCachedBlocks()
Expand All @@ -243,3 +251,14 @@ func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc, logger type

return s, err
}

func (m *Manager) startNonAggregatorLoop(ctx context.Context) {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
go m.RetrieveLoop(ctx)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
go m.SyncTargetLoop(ctx)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

func (m *Manager) startAggregatorLoop(ctx context.Context) {
go m.ProduceBlockLoop(ctx)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
go m.SubmitLoop(ctx)

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

0 comments on commit eeb7272

Please sign in to comment.