Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): applying blocks concurrently can lead to unexpected errors #700

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,10 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
return nil
}

// TODO: move to gossip.go
func (m *Manager) attemptApplyCachedBlocks() error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()
m.executeBlockMutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned before, I think it's best to have this lock on the ApplyBlock function (and change it's name accordingly) as even if you get the race condition on NextHeight as current block with this height is currently being applied, the ApplyBlock have a sanity check on correct height and the block won't be applied.

I think it makes the code much more elegant and simplifies reading and the needs of dealing with future applyBlock callers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this mutex is not only for applyBlock
it mutex between retriever thread and gossip thread
there are multiple params that can be accessed concurrently and needs protection (e.g blockCache, store height, "state" (apply block))

I can change it if u prefer, but IMO it's not hermetic enough

defer m.executeBlockMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()
Expand All @@ -140,13 +141,17 @@ func (m *Manager) attemptApplyCachedBlocks() error {
m.logger.Debug("applied cached block", "height", expectedHeight)
}

return nil
Copy link
Contributor Author

@mtsitrin mtsitrin Apr 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the pruning from each gossiped block, as it's not efficient, it goes over all the cached blocks.
it will be called when syncing the node

}

// pruneCache prunes the cache of gossiped blocks.
func (m *Manager) pruneCache() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we need this pruneCache vs just deleting each cached block after we apply it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it was to handle the cases blocks didn't applied from cache, but from syncTarge

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
}
return nil
}

// isHeightAlreadyApplied checks if the block height is already applied to the app.
Expand Down
42 changes: 30 additions & 12 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,29 @@ type Manager struct {
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
lastSubmissionTime atomic.Int64
batchInProcess sync.Mutex
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex

/*
Guard against triggering a new batch submission when the old one is still going on (taking a while)
*/
submitBatchMutex sync.Mutex

/*
Protect against producing two blocks at once if the first one is taking a while
Also, used to protect against the block production that occurs when batch submission thread
creates its empty block.
*/
produceBlockMutex sync.Mutex

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
*/
executeBlockMutex sync.Mutex

// Logging
logger types.Logger

//TODO: refactor to kvstore to allow quicker iteration when applying blocks by order
// Cached blocks and commits for applying at future heights. Invariant: the block and commit are .Valid() (validated sigs etc)
prevBlock map[uint64]*types.Block
prevCommit map[uint64]*types.Commit
Expand Down Expand Up @@ -217,6 +233,7 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
m.shouldProduceBlocksCh <- eventData.Error == nil
}

// TODO: move to gossip.go
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))
Expand All @@ -230,17 +247,18 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
return
}

// if height is expected, apply
// if height is higher than expected (future block), cache
if block.Header.Height == m.store.NextHeight() {
err := m.applyBlock(&block, &commit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Error("apply gossiped block", "err", err)
}
} else if block.Header.Height > m.store.NextHeight() {
nextHeight := m.store.NextHeight()
if block.Header.Height >= nextHeight {
m.prevBlock[block.Header.Height] = &block
mtsitrin marked this conversation as resolved.
Show resolved Hide resolved
m.prevCommit[block.Header.Height] = &commit
m.logger.Debug("Caching block", "block height", block.Header.Height, "store height", m.store.Height())
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.store.Height())
}

if block.Header.Height == nextHeight {
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying cached blocks", "err", err)
}
}
}

Expand Down
17 changes: 12 additions & 5 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (m *Manager) syncUntilTarget(syncTarget uint64) error {
}
}
m.logger.Info("Synced", "current height", currentHeight, "syncTarget", syncTarget)

// check for cached blocks
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
}

m.pruneCache()

return nil
}

Expand All @@ -84,6 +93,9 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error {

m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height)

m.executeBlockMutex.Lock()
defer m.executeBlockMutex.Unlock()

for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
if block.Header.Height != m.store.NextHeight() {
Expand All @@ -99,11 +111,6 @@ func (m *Manager) processNextDABatch(daMetaData *da.DASubmitMetaData) error {
}
}
}

err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying previous cached blocks", "err", err)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ func (m *Manager) SubmitLoop(ctx context.Context) {
// Finally, it submits the next batch of blocks and updates the sync target to the height of
// the last block in the submitted batch.
func (m *Manager) handleSubmissionTrigger(ctx context.Context) {
if !m.batchInProcess.TryLock() { // Attempt to lock for batch processing
if !m.submitBatchMutex.TryLock() { // Attempt to lock for batch processing
m.logger.Debug("Batch submission already in process, skipping submission")
return
}
defer m.batchInProcess.Unlock() // Ensure unlocking at the end
defer m.submitBatchMutex.Unlock() // Ensure unlocking at the end

// Load current sync target and height to determine if new blocks are available for submission.
syncTarget, height := m.syncTarget.Load(), m.store.Height()
Expand Down
1 change: 1 addition & 0 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,7 @@ func (c *Client) CheckTx(ctx context.Context, tx tmtypes.Tx) (*ctypes.ResultChec
}

func (c *Client) eventsRoutine(sub tmtypes.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) {
defer close(outc)
for {
select {
case msg := <-sub.Out():
Expand Down
41 changes: 19 additions & 22 deletions rpc/json/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,25 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo
return nil, fmt.Errorf("subscribe: %w", err)
}
go func(subscriptionID []byte) {
for {
select {
case msg := <-out:
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
for msg := range out {
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
}
}(subscriptionID)
Expand Down
Loading