Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): applying blocks concurrently can lead to unexpected errors #700

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 2 additions & 33 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,38 +116,6 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
m.lastState = newState

m.store.SetHeight(block.Header.Height)

return nil
}

func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()

prevCachedBlock, blockExists := m.prevBlock[expectedHeight]
prevCachedCommit, commitExists := m.prevCommit[expectedHeight]

if !blockExists || !commitExists {
break
}

m.logger.Debug("Applying cached block", "height", expectedHeight)
err := m.applyBlock(ctx, prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("apply previously cached block", "err", err)
return err
}
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
}
return nil
}

Expand Down Expand Up @@ -193,12 +161,13 @@ func (m *Manager) UpdateStateFromApp() error {
return nil
}

// TODO: move to executor.go
func (m *Manager) executeBlock(ctx context.Context, block *types.Block, commit *types.Commit) (*tmstate.ABCIResponses, error) {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.settlementClient.GetProposer()

if err := m.executor.Validate(m.lastState, block, commit, proposer); err != nil {
if err := m.executor.Validate(m.lastState, block, commit, proposer.PublicKey); err != nil {
return &tmstate.ABCIResponses{}, err
}

Expand Down
9 changes: 5 additions & 4 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"time"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
Expand Down Expand Up @@ -137,11 +138,11 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
}

// Validate validates block and commit.
func (e *Executor) Validate(state types.State, block *types.Block, commit *types.Commit, proposer *types.Sequencer) error {
func (e *Executor) Validate(state types.State, block *types.Block, commit *types.Commit, pubkey cryptotypes.PubKey) error {
if err := e.validateBlock(state, block); err != nil {
return err
}
if err := e.validateCommit(proposer, commit, &block.Header); err != nil {
if err := e.validateCommit(pubkey, commit, &block.Header); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -219,13 +220,13 @@ func (e *Executor) validateBlock(state types.State, block *types.Block) error {
return nil
}

func (e *Executor) validateCommit(proposer *types.Sequencer, commit *types.Commit, header *types.Header) error {
func (e *Executor) validateCommit(pubkey cryptotypes.PubKey, commit *types.Commit, header *types.Header) error {
abciHeaderPb := abciconv.ToABCIHeaderPB(header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
if err != nil {
return err
}
if err = commit.Validate(proposer, abciHeaderBytes); err != nil {
if err = commit.Validate(pubkey, abciHeaderBytes); err != nil {
return err
}
return nil
Expand Down
11 changes: 5 additions & 6 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ func TestApplyBlock(t *testing.T) {

// Create proposer for the block
proposerKey := ed25519.GenPrivKey()
proposer := &types.Sequencer{
PublicKey: proposerKey.PubKey(),
}
proposerPubkey := proposerKey.PubKey()

// Create commit for the block
abciHeaderPb := abciconv.ToABCIHeaderPB(&block.Header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
Expand All @@ -173,7 +172,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = executor.Validate(state, block, commit, proposer)
err = executor.Validate(state, block, commit, proposerPubkey)
require.NoError(err)
resp, err := executor.Execute(context.Background(), state, block)
require.NoError(err)
Expand Down Expand Up @@ -213,7 +212,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block with an invalid commit
err = executor.Validate(state, block, invalidCommit, proposer)
err = executor.Validate(state, block, invalidCommit, proposerPubkey)

// FIXME: This test didn't check for specific error. It was just checking for error.
// If checking for this specific error, it fails
Expand All @@ -230,7 +229,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = executor.Validate(newState, block, commit, proposer)
err = executor.Validate(newState, block, commit, proposerPubkey)
require.NoError(err)
resp, err = executor.Execute(context.Background(), state, block)
require.NoError(err)
Expand Down
12 changes: 8 additions & 4 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ type Manager struct {
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
lastSubmissionTime atomic.Int64
batchInProcess sync.Mutex
submitBatchMutex sync.Mutex
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex
blockExecutionMutex sync.Mutex

// Logging
logger types.Logger
Expand Down Expand Up @@ -173,7 +173,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
resultRetrieveBatch, err := m.getLatestBatchFromSL()
// Set the syncTarget according to the result
if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
Expand Down Expand Up @@ -227,7 +227,11 @@ func (m *Manager) healthStatusEventCallback(event pubsub.Message) {
m.shouldProduceBlocksCh <- eventData.Healthy
}

// TODO: move to gossip.go
func (m *Manager) applyBlockCallback(event pubsub.Message) {
m.blockExecutionMutex.Lock()
defer m.blockExecutionMutex.Unlock()

m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
Expand All @@ -248,7 +252,7 @@ func (m *Manager) applyBlockCallback(event pubsub.Message) {
}

// getLatestBatchFromSL gets the latest batch from the SL
func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultRetrieveBatch, error) {
func (m *Manager) getLatestBatchFromSL() (*settlement.ResultRetrieveBatch, error) {
return m.settlementClient.RetrieveBatch()
}

Expand Down
42 changes: 38 additions & 4 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) error {
m.blockExecutionMutex.Lock()
defer m.blockExecutionMutex.Unlock()

currentHeight := m.store.Height()
for currentHeight < syncTarget {
currStateIdx := atomic.LoadUint64(&m.lastState.SLStateIndex) + 1
Expand All @@ -62,7 +65,14 @@
if err != nil {
return err
}

}
// check for cached blocks
err := m.attemptApplyCachedBlocks(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be here. it should be in applyBlockCallback function. Not sure when it was removed from there.

The purpose of this is the following:

  1. full node is at height x
  2. full node got gossiped with blocks at height x+2, x+3,.. , x+n
  3. full node can't apply it until it has x+1 recieved, so it keeps those blocks in the cache
  4. once full node gets x+1, than it applies all the rest from the cache.

Not sure why was this moved here and what's the purpose of it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's importnat for it to be here

  1. full node missed x
  2. full node got x+1 till x+3000
  3. got sync target of assuming 2000 from SL

it make sense to go over the cache and apply what possible.
otherwise u'll wait for next gossiped block

if err != nil {
m.logger.Debug("Error applying previous cached blocks", "err", err)
}

return nil
}

Expand Down Expand Up @@ -97,10 +107,6 @@
}
}

err := m.attemptApplyCachedBlocks(ctx)
if err != nil {
m.logger.Debug("Error applying previous cached blocks", "err", err)
}
return nil
}

Expand All @@ -122,3 +128,31 @@
// NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute
return batchRes
}

func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
for {
expectedHeight := m.store.NextHeight()

prevCachedBlock, blockExists := m.prevBlock[expectedHeight]
prevCachedCommit, commitExists := m.prevCommit[expectedHeight]

if !blockExists || !commitExists {
break
}

m.logger.Debug("Applying cached block", "height", expectedHeight)
err := m.applyBlock(ctx, prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("apply previously cached block", "err", err)
return err
}
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
}
Fixed Show fixed Hide fixed
return nil
}
4 changes: 2 additions & 2 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ func (m *Manager) handleSubmissionTrigger(ctx context.Context) {

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.

if !m.batchInProcess.TryLock() {
if !m.submitBatchMutex.TryLock() {
m.logger.Debug("Batch submission already in process, skipping submission")
return
}

defer m.batchInProcess.Unlock()
defer m.submitBatchMutex.Unlock()

// We try and produce an empty block to make sure releavnt ibc messages will pass through during the batch submission: https://github.com/dymensionxyz/research/issues/173.
err := m.produceBlock(ctx, true)
Expand Down
2 changes: 2 additions & 0 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,8 @@ func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckT
}

func (c *Client) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) {
defer close(outc)

for {
select {
case msg := <-sub.Out():
Expand Down
41 changes: 19 additions & 22 deletions rpc/json/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,25 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo
return nil, fmt.Errorf("subscribe: %w", err)
}
go func(subscriptionID []byte) {
for {
select {
case msg := <-out:
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
for msg := range out {
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
}
}(subscriptionID)
Expand Down
5 changes: 3 additions & 2 deletions types/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"errors"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
tmtypes "github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -55,11 +56,11 @@ func (c *Commit) ValidateBasic() error {
}

// Validate performs full validation of a commit.
func (c *Commit) Validate(proposer *Sequencer, msg []byte) error {
func (c *Commit) Validate(pubkey cryptotypes.PubKey, msg []byte) error {
if err := c.ValidateBasic(); err != nil {
return err
}
if !proposer.PublicKey.VerifySignature(msg, c.Signatures[0]) {
if !pubkey.VerifySignature(msg, c.Signatures[0]) {
return ErrInvalidSignature
}
return nil
Expand Down
Loading