-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(concurrency): applying blocks concurrently can lead to unexpected errors #700
Changes from 5 commits
a66eb8c
663ae77
d388822
c276b07
dd0a575
45cecf0
69521b3
6433abc
481c705
d343dca
3fd1aa5
8ad28b5
e234d07
0a43157
8c9a1ac
a329116
086e320
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,9 @@ | |
// It fetches the batches from the settlement, gets the DA height and gets | ||
// the actual blocks from the DA. | ||
func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) error { | ||
m.executeBlockMutex.Lock() | ||
defer m.executeBlockMutex.Unlock() | ||
|
||
currentHeight := m.store.Height() | ||
for currentHeight < syncTarget { | ||
currStateIdx := atomic.LoadUint64(&m.lastState.SLStateIndex) + 1 | ||
|
@@ -62,7 +65,14 @@ | |
if err != nil { | ||
return err | ||
} | ||
|
||
} | ||
// check for cached blocks | ||
err := m.attemptApplyCachedBlocks(ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shouldn't be here. it should be in The purpose of this is the following:
Not sure why was this moved here and what's the purpose of it here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's importnat for it to be here
it make sense to go over the cache and apply what possible. |
||
if err != nil { | ||
m.logger.Debug("Error applying previous cached blocks", "err", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
|
@@ -97,10 +107,6 @@ | |
} | ||
} | ||
|
||
err := m.attemptApplyCachedBlocks(ctx) | ||
if err != nil { | ||
m.logger.Debug("Error applying previous cached blocks", "err", err) | ||
} | ||
return nil | ||
} | ||
|
||
|
@@ -122,3 +128,31 @@ | |
// NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute | ||
return batchRes | ||
} | ||
|
||
func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error { | ||
for { | ||
expectedHeight := m.store.NextHeight() | ||
|
||
prevCachedBlock, blockExists := m.prevBlock[expectedHeight] | ||
prevCachedCommit, commitExists := m.prevCommit[expectedHeight] | ||
|
||
if !blockExists || !commitExists { | ||
break | ||
} | ||
|
||
m.logger.Debug("Applying cached block", "height", expectedHeight) | ||
err := m.applyBlock(ctx, prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock}) | ||
if err != nil { | ||
m.logger.Debug("apply previously cached block", "err", err) | ||
return err | ||
} | ||
} | ||
|
||
for k := range m.prevBlock { | ||
if k <= m.store.Height() { | ||
delete(m.prevBlock, k) | ||
delete(m.prevCommit, k) | ||
} | ||
} | ||
|
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This process (
syncUntilTarget
) can be long as it needs to actually fetch the data from the DA upon new sync target (which can be dozens of seconds or even longer).seems to me like during this time no gossiped blocks will be applied as the lock will block it (so basically rpc nodes won't provide "real-time" responses while syncing from the da) .
I think the lock should be more fine grained on the actual execution of the blocks and not include the fetching from the da.
Why are we not just putting it directly on the
applyBlock
/executeBlock
function to only allow one caller access?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aggreed. fetching data won't be mutex locked.
gossiped block will be applied if it's height is correct, it quite conflicts with syncing process. I think it's fine for gossiped block to wait while syncing in progress.
in the happy flow there's no syncing anyway (as blocks are gossiped)
The lock is more lock regarding the
store.Height()
and not only on executionThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still not sure I understand why can't
executeBlockMutex
can't live inside applyBlock?the only operation I see that is being protected with current implementation is
m.store.NextHeight()
.If that's indeed the case I suggest changing the
nextHeight
or in generalheight
access to be atomic and by that simplify the lock and only put it on theapplyBlock
function.