Skip to content

Commit

Permalink
Merge branch 'main' into mtsitrin/1236-err-when-createandpostlastbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin committed Nov 21, 2024
2 parents eae6ca6 + 34018b9 commit d68d0bc
Show file tree
Hide file tree
Showing 32 changed files with 451 additions and 220 deletions.
4 changes: 3 additions & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,14 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
m.logger.Debug("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

// Update the state with the new app hash, and store height from the commit.
// Every one of those, if happens before commit, prevents us from re-executing the block in case failed during commit.
m.Executor.UpdateStateAfterCommit(m.State, responses, appHash, block.Header.Height, block.Header.Hash())

}

// save last block time used to calculate batch skew time
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
// Update the store:
// 1. Save the proposer for the current height to the store.
// 2. Update the proposer in the state in case of rotation.
Expand Down
1 change: 0 additions & 1 deletion block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ func (e *Executor) CreateBlock(
copy(block.Header.DataHash[:], types.GetDataHash(block))
copy(block.Header.SequencerHash[:], state.GetProposerHash())
copy(block.Header.NextSequencersHash[:], nextSeqHash[:])

return block
}

Expand Down
32 changes: 18 additions & 14 deletions block/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"time"

authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
sequencers "github.com/dymensionxyz/dymension-rdk/x/sequencers/types"
"github.com/dymensionxyz/gerr-cosmos/gerrc"
"github.com/gogo/protobuf/proto"

sequencers "github.com/dymensionxyz/dymension-rdk/x/sequencers/types"
"github.com/dymensionxyz/dymint/types"

"github.com/dymensionxyz/dymint/version"
)

Expand Down Expand Up @@ -50,29 +49,33 @@ func (m *Manager) checkForkUpdate(msg string) error {
return err
}

if m.shouldStopNode(rollapp, m.State.GetRevision()) {
err = m.createInstruction(rollapp)
var (
nextHeight = m.State.NextHeight()
actualRevision = m.State.GetRevision()
expectedRevision = rollapp.GetRevisionForHeight(nextHeight)
)
if shouldStopNode(expectedRevision, nextHeight, actualRevision) {
err = m.createInstruction(expectedRevision)
if err != nil {
return err
}

m.freezeNode(fmt.Errorf("%s local_block_height: %d rollapp_revision_start_height: %d local_revision: %d rollapp_revision: %d", msg, m.State.Height(), rollapp.LatestRevision().StartHeight, m.State.GetRevision(), rollapp.LatestRevision().Number))
m.freezeNode(fmt.Errorf("%s local_block_height: %d rollapp_revision_start_height: %d local_revision: %d rollapp_revision: %d", msg, m.State.Height(), expectedRevision.StartHeight, actualRevision, expectedRevision.Number))
}

return nil
}

// createInstruction writes file to disk with fork information
func (m *Manager) createInstruction(rollapp *types.Rollapp) error {
func (m *Manager) createInstruction(expectedRevision types.Revision) error {
obsoleteDrs, err := m.SLClient.GetObsoleteDrs()
if err != nil {
return err
}

revision := rollapp.LatestRevision()
instruction := types.Instruction{
Revision: revision.Number,
RevisionStartHeight: revision.StartHeight,
Revision: expectedRevision.Number,
RevisionStartHeight: expectedRevision.StartHeight,
FaultyDRS: obsoleteDrs,
}

Expand All @@ -89,11 +92,12 @@ func (m *Manager) createInstruction(rollapp *types.Rollapp) error {
// This method checks two conditions to decide if a node should be stopped:
// 1. If the next state height is greater than or equal to the rollapp's revision start height.
// 2. If the block's app version (equivalent to revision) is less than the rollapp's revision
func (m *Manager) shouldStopNode(rollapp *types.Rollapp, revision uint64) bool {
if m.State.NextHeight() >= rollapp.LatestRevision().StartHeight && revision < rollapp.LatestRevision().Number {
return true
}
return false
func shouldStopNode(
expectedRevision types.Revision,
nextHeight uint64,
actualRevisionNumber uint64,
) bool {
return nextHeight >= expectedRevision.StartHeight && actualRevisionNumber < expectedRevision.Number
}

// forkNeeded returns true if the fork file exists
Expand Down
16 changes: 4 additions & 12 deletions block/fork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,8 @@ func TestShouldStopNode(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
state := &types.State{}
state.LastBlockHeight.Store(tt.height)

logger := log.NewNopLogger()

manager := &Manager{
State: state,
logger: logger,
}

result := manager.shouldStopNode(tt.rollapp, tt.block.Header.Version.App)
expectedRevision := tt.rollapp.GetRevisionForHeight(tt.height)
result := shouldStopNode(expectedRevision, tt.height, tt.block.Header.Version.App)
assert.Equal(t, tt.expected, result)
})
}
Expand Down Expand Up @@ -257,7 +248,8 @@ func TestCreateInstruction(t *testing.T) {
}, nil)

manager.SLClient = mockSL
err := manager.createInstruction(tt.rollapp)
expectedRevision := tt.rollapp.GetRevisionForHeight(tt.block.Header.Height)
err := manager.createInstruction(expectedRevision)
if tt.expectedError {
assert.Error(t, err)
} else {
Expand Down
2 changes: 2 additions & 0 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ func (m *Manager) RunInitChain(ctx context.Context) error {
if err != nil {
return err
}

// update the state with only the consensus pubkey
m.Executor.UpdateStateAfterInitChain(m.State, res)
m.Executor.UpdateMempoolAfterInitChain(m.State)
if _, err := m.Store.SaveState(m.State, nil); err != nil {
return err
}

return nil
}
30 changes: 27 additions & 3 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ type Manager struct {
// context used when freezing node
Cancel context.CancelFunc
Ctx context.Context

// LastBlockTimeInSettlement is the time of last submitted block, used to measure batch skew time
LastBlockTimeInSettlement atomic.Int64

// LastBlockTime is the time of last produced block, used to measure batch skew time
LastBlockTime atomic.Int64
/*
Sequencer and full-node
*/
Expand Down Expand Up @@ -299,20 +305,28 @@ func (m *Manager) updateFromLastSettlementState() error {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSettlementHeight.Store(uint64(m.Genesis.InitialHeight - 1))
m.LastBlockTimeInSettlement.Store(m.Genesis.GenesisTime.UTC().UnixNano())
return nil
}

if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}

m.LastSettlementHeight.Store(latestHeight)

if latestHeight >= m.State.NextHeight() {
m.UpdateTargetHeight(latestHeight)
}

m.LastSettlementHeight.Store(latestHeight)

// init last block in settlement time in dymint state to calculate batch submit skew time
m.SetLastBlockTimeInSettlementFromHeight(latestHeight)

// init last block time in dymint state to calculate batch submit skew time
block, err := m.Store.LoadBlock(m.State.Height())
if err == nil {
m.LastBlockTime.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
return nil
}

Expand Down Expand Up @@ -398,3 +412,13 @@ func (m *Manager) freezeNode(err error) {
uevent.MustPublish(m.Ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
m.Cancel()
}

// SetLastBlockTimeInSettlementFromHeight is used to initialize LastBlockTimeInSettlement from rollapp height in settlement
func (m *Manager) SetLastBlockTimeInSettlementFromHeight(lastSettlementHeight uint64) {
block, err := m.Store.LoadBlock(lastSettlementHeight)
if err != nil {
// if settlement height block is not found it will be updated after, when syncing
return
}
m.LastBlockTimeInSettlement.Store(block.Header.GetTimestamp().UTC().UnixNano())
}
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.True(t, manager.State.Height() == 0)

// enough time to sync and produce blocks
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
// Capture the error returned by manager.Start.

Expand Down
7 changes: 3 additions & 4 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,15 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int)
}

bytesProducedN := block.SizeBytes() + commit.SizeBytes()
m.logger.Info("New block.", "size", uint64(block.ToProto().Size()))

select {
case <-ctx.Done():
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission. " +
"Pausing block production until a signal is consumed.")
m.logger.Error("Pausing block production until new batch is submitted.", "Batch skew time", m.GetBatchSkewTime(), "Max batch skew time", m.Conf.MaxSkewTime)
select {
case <-ctx.Done():
return nil
Expand Down Expand Up @@ -184,7 +183,7 @@ func (m *Manager) produceBlock(opts ProduceBlockOptions) (*types.Block, *types.C
return nil, nil, fmt.Errorf("create commit: %w: %w", err, ErrNonRecoverable)
}

m.logger.Info("Block created.", "height", newHeight, "num_tx", len(block.Data.Txs))
m.logger.Info("Block created.", "height", newHeight, "num_tx", len(block.Data.Txs), "size", block.SizeBytes()+commit.SizeBytes())
types.RollappBlockSizeBytesGauge.Set(float64(len(block.Data.Txs)))
types.RollappBlockSizeTxsGauge.Set(float64(len(block.Data.Txs)))
return block, commit, nil
Expand Down
1 change: 0 additions & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResp
copy(s.LastHeaderHash[:], lastHeaderHash[:])

s.SetHeight(height)

if resp.EndBlock.ConsensusParamUpdates != nil {
s.ConsensusParams.Block.MaxGas = resp.EndBlock.ConsensusParamUpdates.Block.MaxGas
s.ConsensusParams.Block.MaxBytes = resp.EndBlock.ConsensusParamUpdates.Block.MaxBytes
Expand Down
Loading

0 comments on commit d68d0bc

Please sign in to comment.