Skip to content

Commit

Permalink
Merge branch 'main' into srene/656-gossiped-block-validation
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin committed May 5, 2024
2 parents addc89c + bb1fd1c commit b9c38c5
Show file tree
Hide file tree
Showing 67 changed files with 7,682 additions and 3,135 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ jobs:
git config user.name 'github-actions'
git config user.email '[email protected]'
git add CHANGELOG.md
git commit -m "Update CHANGELOG.md [skip ci]"
git commit -m "Update CHANGELOG.md"
git push origin HEAD:refs/heads/${{ github.head_ref }}
31 changes: 31 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
with-expecter: true
recursive: true
packages:
github.com/dymensionxyz/dymint/settlement/dymension:
interfaces:
CosmosClient:
github.com/dymensionxyz/dymint/settlement:
interfaces:
LayerI:
HubClient:
github.com/dymensionxyz/dymension/v3/x/sequencer/types:
interfaces:
QueryClient:
github.com/dymensionxyz/dymension/v3/x/rollapp/types:
interfaces:
QueryClient:
github.com/tendermint/tendermint/abci/types:
interfaces:
Application:
github.com/tendermint/tendermint/proxy:
interfaces:
AppConnConsensus:
AppConns:
github.com/dymensionxyz/dymint/da/celestia/types:
interfaces:
CelestiaRPCClient:
github.com/dymensionxyz/dymint/da/avail:
interfaces:
SubstrateApiI:


20 changes: 19 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-04-29)
# [](https://github.com/dymensionxyz/dymint/compare/v1.1.0-rc02...v) (2024-05-03)


Check failure on line 3 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 2]
### Bug Fixes

Check failure on line 4 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Heading levels should only increment by one level at a time [Expected: h2; Actual: h3]

* **bug:** sync from da and p2p when starting a node ([#763](https://github.com/dymensionxyz/dymint/issues/763)) ([68ffd05](https://github.com/dymensionxyz/dymint/commit/68ffd05794949ddc42df1c132d1fde5f21b505f4))
* **celestia test:** fix race in test ([#755](https://github.com/dymensionxyz/dymint/issues/755)) ([0b36781](https://github.com/dymensionxyz/dymint/commit/0b367818bf6aa8da4a4fd8e4e5c78223b60b44e0))
* **celestia:** impl retry on submit ([#748](https://github.com/dymensionxyz/dymint/issues/748)) ([61630eb](https://github.com/dymensionxyz/dymint/commit/61630eb458197abe2440a81426210000dff25d40))
* **celestia:** use fixed delay in repeat attempts ([#753](https://github.com/dymensionxyz/dymint/issues/753)) ([53002b0](https://github.com/dymensionxyz/dymint/commit/53002b0a070743811295a98580ba038cac40cc7d))
* **code standards:** renames error -> err in celestia ([#768](https://github.com/dymensionxyz/dymint/issues/768)) ([1189384](https://github.com/dymensionxyz/dymint/commit/1189384d1225b3dd65481c9dedbae423e4f8ac04))
* **da:** fixed da path seperator and encoding issue ([#731](https://github.com/dymensionxyz/dymint/issues/731)) ([3a3b219](https://github.com/dymensionxyz/dymint/commit/3a3b21932750fee7eaaa9c186f78e36e3e597746))
* **DA:** use expo backoff in retries ([#739](https://github.com/dymensionxyz/dymint/issues/739)) ([848085f](https://github.com/dymensionxyz/dymint/commit/848085f70bcaae81fb80da3ab78c4d8b399e13b1))
* **doc:** manager cache comment ([#767](https://github.com/dymensionxyz/dymint/issues/767)) ([b88bf6e](https://github.com/dymensionxyz/dymint/commit/b88bf6e72820c944b290147724255cc8466ada50))
* **logging:** added reason for websocket closed debug msg ([#746](https://github.com/dymensionxyz/dymint/issues/746)) ([3aa7d80](https://github.com/dymensionxyz/dymint/commit/3aa7d80ace92b3b0f79e4f338f10bb94c96ab6dd))
* **logs:** make logs more readable in a couple places, fix race cond ([#749](https://github.com/dymensionxyz/dymint/issues/749)) ([f05ef39](https://github.com/dymensionxyz/dymint/commit/f05ef3957b754c05fbc90aa39eabce80bbe65933))
* **p2p:** validate block before applying and not before caching in p2p gossiping ([#723](https://github.com/dymensionxyz/dymint/issues/723)) ([98371b5](https://github.com/dymensionxyz/dymint/commit/98371b5220613e70f3274fab5593e02ba532f7db))
* **produce loop:** handle unauthenticated error in settlement layer ([#726](https://github.com/dymensionxyz/dymint/issues/726)) ([33e78d1](https://github.com/dymensionxyz/dymint/commit/33e78d116b5f14b91b8b3bda2b6cbfee9040e2d3))
* **rpc:** nil panic in rpc/json/handler.go WriteError ([#750](https://github.com/dymensionxyz/dymint/issues/750)) ([e09709b](https://github.com/dymensionxyz/dymint/commit/e09709b428a33da002defb9f13178fa19b81a69b))
* **settlement:** remove state index from proto ([#777](https://github.com/dymensionxyz/dymint/issues/777)) ([767b8fd](https://github.com/dymensionxyz/dymint/commit/767b8fdb490c37deee43ac023688410bbb98ccb0))
* **sync:** make sure we use a latest state index as a start point ([#760](https://github.com/dymensionxyz/dymint/issues/760)) ([43e2d96](https://github.com/dymensionxyz/dymint/commit/43e2d965f2b505751f8e5260549e909c976141ee))
* **tests:** fix unit tests, mocks, cleanup/dry hub queries ([#782](https://github.com/dymensionxyz/dymint/issues/782)) ([c276aea](https://github.com/dymensionxyz/dymint/commit/c276aea12c9cd37f62fcf9d684c4efe901a510bf))


Check failure on line 23 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 2]
### Features

* **produce:** limiting block size by maxBatchSize ([#784](https://github.com/dymensionxyz/dymint/issues/784)) ([f90042c](https://github.com/dymensionxyz/dymint/commit/f90042cd61fc6b60093478cd65491f8aa1106457))


Check failure on line 28 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 2]

Check failure on line 29 in CHANGELOG.md

View workflow job for this annotation

GitHub Actions / markdownlint

Multiple consecutive blank lines [Expected: 1; Actual: 3]
Expand Down
10 changes: 5 additions & 5 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, validators []*tmtypes.
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state types.State) *types.Block {
maxBytes := state.ConsensusParams.Block.MaxBytes
maxGas := state.ConsensusParams.Block.MaxGas

mempoolTxs := e.mempool.ReapMaxBytesMaxGas(maxBytes, maxGas)
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash [32]byte, state types.State, maxBytes uint64) *types.Block {
if state.ConsensusParams.Block.MaxBytes > 0 {
maxBytes = min(maxBytes, uint64(state.ConsensusParams.Block.MaxBytes))
}
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBytes), state.ConsensusParams.Block.MaxGas)

block := &types.Block{
Header: types.Header{
Expand Down
27 changes: 16 additions & 11 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (

"github.com/dymensionxyz/dymint/mempool"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/mocks"
tmmocks "github.com/dymensionxyz/dymint/mocks/github.com/tendermint/tendermint/abci/types"
tmmocksproxy "github.com/dymensionxyz/dymint/mocks/github.com/tendermint/tendermint/proxy"

"github.com/dymensionxyz/dymint/types"
)

Expand All @@ -32,7 +34,7 @@ func TestCreateBlock(t *testing.T) {

logger := log.TestingLogger()

app := &mocks.Application{}
app := &tmmocks.MockApplication{}
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})

clientCreator := proxy.NewLocalClientCreator(app)
Expand All @@ -47,21 +49,23 @@ func TestCreateBlock(t *testing.T) {
executor, err := block.NewExecutor([]byte("test address"), nsID, "test", mpool, proxy.NewAppConns(clientCreator), nil, logger)
assert.NoError(err)

maxBytes := uint64(100)

state := types.State{}
state.ConsensusParams.Block.MaxBytes = 100
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000
state.Validators = tmtypes.NewValidatorSet(nil)

// empty block
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, state)
block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, state, maxBytes)
require.NotNil(block)
assert.Empty(block.Data.Txs)
assert.Equal(uint64(1), block.Header.Height)

// one small Tx
err = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block = executor.CreateBlock(2, &types.Commit{}, [32]byte{}, state)
block = executor.CreateBlock(2, &types.Commit{}, [32]byte{}, state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 1)
Expand All @@ -71,7 +75,7 @@ func TestCreateBlock(t *testing.T) {
require.NoError(err)
err = mpool.CheckTx(make([]byte, 100), func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block = executor.CreateBlock(3, &types.Commit{}, [32]byte{}, state)
block = executor.CreateBlock(3, &types.Commit{}, [32]byte{}, state, maxBytes)
require.NotNil(block)
assert.Len(block.Data.Txs, 2)
}
Expand All @@ -83,7 +87,7 @@ func TestApplyBlock(t *testing.T) {
logger := log.TestingLogger()

// Mock ABCI app
app := &mocks.Application{}
app := &tmmocks.MockApplication{}
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{})
app.On("DeliverTx", mock.Anything).Return(abci.ResponseDeliverTx{})
Expand Down Expand Up @@ -115,7 +119,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(eventBus.Start())

// Mock app connections
appConns := &mocks.AppConns{}
appConns := &tmmocksproxy.MockAppConns{}
appConns.On("Consensus").Return(abciClient)
appConns.On("Query").Return(abciClient)
executor, err := block.NewExecutor([]byte("test address"), nsID, chainID, mpool, appConns, eventBus, logger)
Expand Down Expand Up @@ -143,13 +147,14 @@ func TestApplyBlock(t *testing.T) {
}
state.InitialHeight = 1
state.LastBlockHeight = 0
state.ConsensusParams.Block.MaxBytes = 100
maxBytes := uint64(100)
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000

// Create first block with one Tx from mempool
_ = mpool.CheckTx([]byte{1, 2, 3, 4}, func(r *abci.Response) {}, mempool.TxInfo{})
require.NoError(err)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{}, state)
block := executor.CreateBlock(1, &types.Commit{Height: 0}, [32]byte{}, state, maxBytes)
require.NotNil(block)
assert.Equal(uint64(1), block.Header.Height)
assert.Len(block.Data.Txs, 1)
Expand Down Expand Up @@ -191,7 +196,7 @@ func TestApplyBlock(t *testing.T) {
require.NoError(mpool.CheckTx([]byte{5, 6, 7, 8, 9}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx([]byte{1, 2, 3, 4, 5}, func(r *abci.Response) {}, mempool.TxInfo{}))
require.NoError(mpool.CheckTx(make([]byte, 90), func(r *abci.Response) {}, mempool.TxInfo{}))
block = executor.CreateBlock(2, commit, [32]byte{}, newState)
block = executor.CreateBlock(2, commit, [32]byte{}, newState, maxBytes)
require.NotNil(block)
assert.Equal(uint64(2), block.Header.Height)
assert.Len(block.Data.Txs, 3)
Expand Down
56 changes: 27 additions & 29 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

"github.com/dymensionxyz/dymint/gerr"

uevent "github.com/dymensionxyz/dymint/utils/event"

"code.cloudfoundry.org/go-diodes"
Expand Down Expand Up @@ -86,7 +88,8 @@ type Manager struct {

logger types.Logger

// Cached blocks and commits for applying at future heights. Invariant: the block and commit are .Valid() (validated sigs etc)
// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
// we can only do full validation in sequential order.
blockCache map[uint64]CachedBlock
}

Expand Down Expand Up @@ -167,18 +170,20 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
}
}

if !isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyGossipedBlocksLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger)
}

err := m.syncBlockManager()
if err != nil {
err = fmt.Errorf("sync block manager: %w", err)
return err
return fmt.Errorf("sync block manager: %w", err)
}

if isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
go uevent.MustSubscribe(ctx, m.Pubsub, "applyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.onNewGossipedBlock, m.logger, 100)
go m.RetrieveLoop(ctx)
go m.SyncTargetLoop(ctx)
}
Expand All @@ -188,26 +193,25 @@ func (m *Manager) Start(ctx context.Context, isAggregator bool) error {

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager() error {
resultRetrieveBatch, err := m.getLatestBatchFromSL()
// Set the syncTarget according to the result
res, err := m.SLClient.RetrieveBatch()
if errors.Is(err, gerr.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL. Start writing first batch.")
m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
if errors.Is(err, settlement.ErrBatchNotFound) {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
m.SyncTarget.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
return err
}
m.SyncTarget.Store(resultRetrieveBatch.EndHeight)
err = m.syncUntilTarget(resultRetrieveBatch.EndHeight)
// Set the syncTarget according to the result
m.SyncTarget.Store(res.EndHeight)
err = m.syncUntilTarget(res.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced", "current height", m.Store.Height(), "syncTarget", m.SyncTarget.Load())
m.logger.Info("Synced.", "current height", m.Store.Height(), "syncTarget", m.SyncTarget.Load())
return nil
}

Expand All @@ -229,14 +233,15 @@ func getAddress(key crypto.PrivKey) ([]byte, error) {

func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
eventData := event.Data().(*events.DataHealthStatus)
m.logger.Info("received health status event", "eventData", eventData)
m.logger.Info("Received node health status event.", "eventData", eventData)
m.shouldProduceBlocksCh <- eventData.Error == nil
}

// TODO: move to gossip.go
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.logger.Debug("Received new block event", "eventData", event.Data(), "cachedBlocks", len(m.blockCache))
m.retrieverMutex.Lock() // needed to protect blockCache access
m.logger.Debug("Received new block via gossip", "n cachedBlocks", len(m.blockCache))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
Expand All @@ -249,20 +254,13 @@ func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
}
m.logger.Debug("caching block", "block height", block.Header.Height, "store height", m.Store.Height())
}

if block.Header.Height == nextHeight {
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying cached blocks", "err", err)
}
m.retrieverMutex.Unlock() // have to give this up as it's locked again in attempt apply, and we're not re-entrant
err := m.attemptApplyCachedBlocks()
if err != nil {
m.logger.Error("applying cached blocks", "err", err)
}
}

// getLatestBatchFromSL gets the latest batch from the SL
func (m *Manager) getLatestBatchFromSL() (*settlement.ResultRetrieveBatch, error) {
return m.SLClient.RetrieveBatch()
}

// getInitialState tries to load lastState from Store, and if it's not available it reads GenesisDoc.
func getInitialState(store store.Store, genesis *tmtypes.GenesisDoc, logger types.Logger) (types.State, error) {
s, err := store.LoadState()
Expand Down
8 changes: 2 additions & 6 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func TestInitialState(t *testing.T) {

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

dalc := testutil.GetMockDALC(logger)
agg, err := block.NewManager(key, conf, c.genesis, c.store, nil, proxyApp, dalc, settlementlc,
nil, pubsubServer, p2pClient, logger)
Expand All @@ -109,7 +108,7 @@ func TestInitialState(t *testing.T) {
}

// TestProduceOnlyAfterSynced should test that we are resuming publishing blocks after we are synced
// 1. Submit a batch and outsync the manager
// 1. Submit a batch and desync the manager
// 2. Fail to produce blocks
// 2. Sync the manager
// 3. Succeed to produce blocks
Expand Down Expand Up @@ -148,8 +147,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
go func() {
errChan <- manager.Start(ctx, true)
err := <-errChan
// Check for error from manager.Start.
assert.NoError(t, err, "Manager start should not produce an error")
assert.NoError(t, err)
}()
<-ctx.Done()
assert.Equal(t, batch.EndHeight, manager.SyncTarget.Load())
Expand All @@ -162,7 +160,6 @@ func TestRetrieveDaBatchesFailed(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, manager)

t.Log(manager.LastState.SLStateIndex)
daMetaData := &da.DASubmitMetaData{
Client: da.Mock,
Height: 1,
Expand Down Expand Up @@ -472,7 +469,6 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
}

func TestDAFetch(t *testing.T) {

require := require.New(t)
// Setup app
app := testutil.GetAppMock(testutil.Info, testutil.Commit)
Expand Down
4 changes: 2 additions & 2 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
m.logger.Info("block production paused - awaiting positive continuation signal")
shouldProduceBlocks = <-m.shouldProduceBlocksCh
}
m.logger.Info("resumed block resumed")
m.logger.Info("resumed block production")
}
}
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er
} else if !errors.Is(err, store.ErrKeyNotFound) {
return nil, nil, fmt.Errorf("load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable)
} else {
block = m.Executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.LastState)
block = m.Executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.LastState, m.Conf.BlockBatchMaxSizeBytes)
if !allowEmpty && len(block.Data.Txs) == 0 {
return nil, nil, fmt.Errorf("%w: %w", types.ErrSkippedEmptyBlock, ErrRecoverable)
}
Expand Down
Loading

0 comments on commit b9c38c5

Please sign in to comment.