Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(concurrency): applying blocks concurrently can lead to unexpected errors #700

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 2 additions & 32 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,37 +123,6 @@ func (m *Manager) applyBlock(ctx context.Context, block *types.Block, commit *ty
return nil
}

func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
m.applyCachedBlockMutex.Lock()
defer m.applyCachedBlockMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()

prevCachedBlock, blockExists := m.prevBlock[expectedHeight]
prevCachedCommit, commitExists := m.prevCommit[expectedHeight]

if !blockExists || !commitExists {
break
}

m.logger.Debug("Applying cached block", "height", expectedHeight)
err := m.applyBlock(ctx, prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("apply previously cached block", "err", err)
return err
}
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
}
return nil
}

// isHeightAlreadyApplied checks if the block height is already applied to the app.
func (m *Manager) isHeightAlreadyApplied(blockHeight uint64) (bool, error) {
proxyAppInfo, err := m.executor.GetAppInfo()
Expand Down Expand Up @@ -198,12 +167,13 @@ func (m *Manager) UpdateStateFromApp() error {
return nil
}

// TODO: move to executor.go
func (m *Manager) executeBlock(ctx context.Context, block *types.Block, commit *types.Commit) (*tmstate.ABCIResponses, error) {
// Currently we're assuming proposer is never nil as it's a pre-condition for
// dymint to start
proposer := m.settlementClient.GetProposer()

if err := m.executor.Validate(m.lastState, block, commit, proposer); err != nil {
if err := m.executor.Validate(m.lastState, block, commit, proposer.PublicKey); err != nil {
return &tmstate.ABCIResponses{}, err
}

Expand Down
9 changes: 5 additions & 4 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"time"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
Expand Down Expand Up @@ -137,11 +138,11 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
}

// Validate validates block and commit.
func (e *Executor) Validate(state types.State, block *types.Block, commit *types.Commit, proposer *types.Sequencer) error {
func (e *Executor) Validate(state types.State, block *types.Block, commit *types.Commit, pubkey cryptotypes.PubKey) error {
if err := e.validateBlock(state, block); err != nil {
return err
}
if err := e.validateCommit(proposer, commit, &block.Header); err != nil {
if err := e.validateCommit(pubkey, commit, &block.Header); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -221,13 +222,13 @@ func (e *Executor) validateBlock(state types.State, block *types.Block) error {
return nil
}

func (e *Executor) validateCommit(proposer *types.Sequencer, commit *types.Commit, header *types.Header) error {
func (e *Executor) validateCommit(pubkey cryptotypes.PubKey, commit *types.Commit, header *types.Header) error {
abciHeaderPb := abciconv.ToABCIHeaderPB(header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
if err != nil {
return err
}
if err = commit.Validate(proposer, abciHeaderBytes); err != nil {
if err = commit.Validate(pubkey, abciHeaderBytes); err != nil {
return err
}
return nil
Expand Down
11 changes: 5 additions & 6 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ func TestApplyBlock(t *testing.T) {

// Create proposer for the block
proposerKey := ed25519.GenPrivKey()
proposer := &types.Sequencer{
PublicKey: proposerKey.PubKey(),
}
proposerPubkey := proposerKey.PubKey()

// Create commit for the block
abciHeaderPb := abciconv.ToABCIHeaderPB(&block.Header)
abciHeaderBytes, err := abciHeaderPb.Marshal()
Expand All @@ -173,7 +172,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = executor.Validate(state, block, commit, proposer)
err = executor.Validate(state, block, commit, proposerPubkey)
require.NoError(err)
resp, err := executor.Execute(context.Background(), state, block)
require.NoError(err)
Expand Down Expand Up @@ -213,7 +212,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block with an invalid commit
err = executor.Validate(state, block, invalidCommit, proposer)
err = executor.Validate(state, block, invalidCommit, proposerPubkey)

// FIXME: This test didn't check for specific error. It was just checking for error.
// If checking for this specific error, it fails
Expand All @@ -230,7 +229,7 @@ func TestApplyBlock(t *testing.T) {
}

// Apply the block
err = executor.Validate(newState, block, commit, proposer)
err = executor.Validate(newState, block, commit, proposerPubkey)
require.NoError(err)
resp, err = executor.Execute(context.Background(), state, block)
require.NoError(err)
Expand Down
29 changes: 24 additions & 5 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,24 @@ type Manager struct {
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
lastSubmissionTime atomic.Int64
batchInProcess sync.Mutex
produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex

/*
Guard against triggering a new batch submission when the old one is still going on (taking a while)
*/
submitBatchMutex sync.Mutex

/*
Protect against producing two blocks at once if the first one is taking a while
Also, used to protect against the block production that occurs when batch submission thread
creates its empty block.
*/
produceBlockMutex sync.Mutex

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
*/
executeBlockMutex sync.Mutex

// Logging
logger types.Logger
Expand Down Expand Up @@ -173,7 +188,7 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
resultRetrieveBatch, err := m.getLatestBatchFromSL()
// Set the syncTarget according to the result
if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
Expand Down Expand Up @@ -227,7 +242,11 @@ func (m *Manager) healthStatusEventCallback(event pubsub.Message) {
m.shouldProduceBlocksCh <- eventData.Healthy
}

// TODO: move to gossip.go
func (m *Manager) applyBlockCallback(event pubsub.Message) {
m.executeBlockMutex.Lock()
defer m.executeBlockMutex.Unlock()

m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.prevBlock))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
Expand All @@ -248,7 +267,7 @@ func (m *Manager) applyBlockCallback(event pubsub.Message) {
}

// getLatestBatchFromSL gets the latest batch from the SL
func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultRetrieveBatch, error) {
func (m *Manager) getLatestBatchFromSL() (*settlement.ResultRetrieveBatch, error) {
return m.settlementClient.RetrieveBatch()
}

Expand Down
46 changes: 42 additions & 4 deletions block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
// It fetches the batches from the settlement, gets the DA height and gets
// the actual blocks from the DA.
func (m *Manager) syncUntilTarget(ctx context.Context, syncTarget uint64) error {

currentHeight := m.store.Height()
for currentHeight < syncTarget {
currStateIdx := atomic.LoadUint64(&m.lastState.SLStateIndex) + 1
Expand All @@ -62,7 +63,14 @@
if err != nil {
return err
}

}
// check for cached blocks
err := m.attemptApplyCachedBlocks(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be here. it should be in applyBlockCallback function. Not sure when it was removed from there.

The purpose of this is the following:

  1. full node is at height x
  2. full node got gossiped with blocks at height x+2, x+3,.. , x+n
  3. full node can't apply it until it has x+1 recieved, so it keeps those blocks in the cache
  4. once full node gets x+1, than it applies all the rest from the cache.

Not sure why was this moved here and what's the purpose of it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's importnat for it to be here

  1. full node missed x
  2. full node got x+1 till x+3000
  3. got sync target of assuming 2000 from SL

it make sense to go over the cache and apply what possible.
otherwise u'll wait for next gossiped block

if err != nil {
m.logger.Debug("Error applying previous cached blocks", "err", err)
}

return nil
}

Expand All @@ -85,6 +93,9 @@

m.logger.Debug("retrieved batches", "n", len(batchResp.Batches), "daHeight", daMetaData.Height)

m.executeBlockMutex.Lock()
defer m.executeBlockMutex.Unlock()

for _, batch := range batchResp.Batches {
for i, block := range batch.Blocks {
if block.Header.Height != m.store.NextHeight() {
Expand All @@ -97,10 +108,6 @@
}
}

err := m.attemptApplyCachedBlocks(ctx)
if err != nil {
m.logger.Debug("Error applying previous cached blocks", "err", err)
}
return nil
}

Expand All @@ -122,3 +129,34 @@
// NMT proofs (availRes.MetaData.Proofs) are included in the result batchRes, necessary to be included in the dispute
return batchRes
}

func (m *Manager) attemptApplyCachedBlocks(ctx context.Context) error {
m.executeBlockMutex.Lock()
defer m.executeBlockMutex.Unlock()

for {
expectedHeight := m.store.NextHeight()

prevCachedBlock, blockExists := m.prevBlock[expectedHeight]
prevCachedCommit, commitExists := m.prevCommit[expectedHeight]

if !blockExists || !commitExists {
break
}

m.logger.Debug("Applying cached block", "height", expectedHeight)
err := m.applyBlock(ctx, prevCachedBlock, prevCachedCommit, blockMetaData{source: gossipedBlock})
if err != nil {
m.logger.Debug("apply previously cached block", "err", err)
return err
}
}

for k := range m.prevBlock {
if k <= m.store.Height() {
delete(m.prevBlock, k)
delete(m.prevCommit, k)
}
}
Fixed Show fixed Hide fixed
return nil
}
8 changes: 5 additions & 3 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@ func (m *Manager) handleSubmissionTrigger(ctx context.Context) {

// Submit batch if we've reached the batch size and there isn't another batch currently in submission process.

if !m.batchInProcess.TryLock() {
if !m.submitBatchMutex.TryLock() {
m.logger.Debug("Batch submission already in process, skipping submission")
return
}

defer m.batchInProcess.Unlock()
defer m.submitBatchMutex.Unlock()

// We try and produce an empty block to make sure releavnt ibc messages will pass through during the batch submission: https://github.com/dymensionxyz/research/issues/173.
// For various reasons we add an empty block onto every batch. For example, to make sure relayers
// have the necessary commit in order to relay a TX from the first of two blocks.
// See https://github.com/dymensionxyz/research/issues/173 details
err := m.produceBlock(ctx, true)
if err != nil {
m.logger.Error("while producing empty block", "error", err)
Expand Down
2 changes: 2 additions & 0 deletions rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,8 @@ func (c *Client) CheckTx(ctx context.Context, tx types.Tx) (*ctypes.ResultCheckT
}

func (c *Client) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) {
defer close(outc)

for {
select {
case msg := <-sub.Out():
Expand Down
41 changes: 19 additions & 22 deletions rpc/json/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,25 @@ func (s *service) Subscribe(req *http.Request, args *subscribeArgs, wsConn *wsCo
return nil, fmt.Errorf("subscribe: %w", err)
}
go func(subscriptionID []byte) {
for {
select {
case msg := <-out:
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
for msg := range out {
// build the base response
var resp rpctypes.RPCResponse
// Check if subscriptionID is string or int and generate the rest of the response accordingly
subscriptionIDInt, err := strconv.Atoi(string(subscriptionID))
if err != nil {
s.logger.Info("Failed to convert subscriptionID to int")
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCStringID(subscriptionID), msg)
} else {
resp = rpctypes.NewRPCSuccessResponse(rpctypes.JSONRPCIntID(subscriptionIDInt), msg)
}
// Marshal response to JSON and send it to the websocket queue
jsonBytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
s.logger.Error("marshal RPCResponse to JSON", "err", err)
continue
}
if wsConn != nil {
wsConn.queue <- jsonBytes
}
}
}(subscriptionID)
Expand Down
5 changes: 3 additions & 2 deletions types/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"errors"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
tmtypes "github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -55,11 +56,11 @@ func (c *Commit) ValidateBasic() error {
}

// Validate performs full validation of a commit.
func (c *Commit) Validate(proposer *Sequencer, msg []byte) error {
func (c *Commit) Validate(pubkey cryptotypes.PubKey, msg []byte) error {
if err := c.ValidateBasic(); err != nil {
return err
}
if !proposer.PublicKey.VerifySignature(msg, c.Signatures[0]) {
if !pubkey.VerifySignature(msg, c.Signatures[0]) {
return ErrInvalidSignature
}
return nil
Expand Down
Loading