Skip to content

Commit

Permalink
fix + test
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Nov 8, 2024
1 parent 306f6e4 commit a48066b
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 84 deletions.
2 changes: 1 addition & 1 deletion block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (e *Executor) ExecuteBlock(block *types.Block) (*tmstate.ABCIResponses, err
Votes: nil,
},
ByzantineValidators: nil,
ConsensusMessages: block.Data.ConsensusMessages,
//ConsensusMessages: block.Data.ConsensusMessages,
})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (e *Executor) UpdateStateAfterInitChain(s *types.State, res *abci.ResponseI
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
copy(s.LastResultsHash[:], merkle.HashFromByteSlices(nil))
s.LastSubmittedBlockTime = time.Now()
}

func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) {
Expand Down
27 changes: 6 additions & 21 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m *Manager) SubmitLoop(ctx context.Context,
bytesProduced,
m.Conf.BatchSkew,
m.GetUnsubmittedBlocks,
m.GetTimeSkew,
m.State.GetSkewTime,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -46,7 +46,7 @@ func SubmitLoopInner(
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew time.Duration, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocksNum func() uint64,
unsubmittedBlocksTime func() time.Duration,
skewTime func() time.Duration,
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
Expand All @@ -62,8 +62,7 @@ func SubmitLoopInner(
// 'trigger': this thread is responsible for waking up the submitter when a new block arrives, and back-pressures the block production loop
// if it gets too far ahead.
for {
skewTime := unsubmittedBlocksTime()
if maxBatchSkew < skewTime {
if maxBatchSkew < skewTime() {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
Expand All @@ -83,7 +82,7 @@ func SubmitLoopInner(

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))

submitter.Nudge()
}
Expand All @@ -100,19 +99,17 @@ func SubmitLoopInner(
case <-submitter.C:
}
pending := pendingBytes.Load()
skewTime := unsubmittedBlocksTime()

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0
skewTime := unsubmittedBlocksTime()

lastSubmissionIsRecent := skewTime < maxBatchTime
lastSubmissionIsRecent := skewTime() < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes
if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
Expand Down Expand Up @@ -288,18 +285,6 @@ func (m *Manager) GetUnsubmittedBlocks() uint64 {
return m.State.Height() - m.LastSettlementHeight.Load()
}

func (m *Manager) GetTimeSkew() time.Duration {
currentBlock, err := m.Store.LoadBlock(m.State.Height())
if err != nil {
return time.Duration(0)
}
lastSubmittedBlock, err := m.Store.LoadBlock(m.LastSubmittedHeight.Load())
if err != nil {
return time.Duration(0)
}
return currentBlock.Header.GetTimestamp().Sub(lastSubmittedBlock.Header.GetTimestamp())
}

// UpdateLastSubmittedHeight will update last height submitted height upon events.
// This may be necessary in case we crashed/restarted before getting response for our submission to the settlement layer.
func (m *Manager) UpdateLastSubmittedHeight(event pubsub.Message) {
Expand Down
2 changes: 2 additions & 0 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func TestSubmissionByTime(t *testing.T) {
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

manager.State.LastSubmittedBlockTime = time.Now()
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
Expand Down Expand Up @@ -331,6 +332,7 @@ func TestSubmissionByBatchSize(t *testing.T) {
managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes
manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.State.LastSubmittedBlockTime = time.Now()

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
Expand Down
3 changes: 3 additions & 0 deletions proto/types/dymint/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ message State {

// Proposer is a sequencer that acts as a proposer. Can be nil if no proposer is set.
Sequencer proposer = 20;

google.protobuf.Timestamp last_submitted_block_time = 21 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];

}

//rollapp params defined in genesis and updated via gov proposal
Expand Down
155 changes: 105 additions & 50 deletions types/pb/dymint/state.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 13 additions & 12 deletions types/serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,17 +261,18 @@ func (s *State) ToProto() (*pb.State, error) {
}

return &pb.State{
Version: &s.Version,
ChainId: s.ChainID,
InitialHeight: int64(s.InitialHeight),
LastBlockHeight: int64(s.Height()),
LastBlockTime: s.LastBlockTime,
ConsensusParams: s.ConsensusParams,
LastResultsHash: s.LastResultsHash[:],
LastHeaderHash: s.LastHeaderHash[:],
AppHash: s.AppHash[:],
RollappParams: s.RollappParams,
Proposer: proposerProto,
Version: &s.Version,
ChainId: s.ChainID,
InitialHeight: int64(s.InitialHeight),
LastBlockHeight: int64(s.Height()),
LastBlockTime: s.LastBlockTime,
ConsensusParams: s.ConsensusParams,
LastResultsHash: s.LastResultsHash[:],
LastHeaderHash: s.LastHeaderHash[:],
AppHash: s.AppHash[:],
RollappParams: s.RollappParams,
Proposer: proposerProto,
LastSubmittedBlockTime: s.LastSubmittedBlockTime,
}, nil
}

Expand All @@ -298,7 +299,7 @@ func (s *State) FromProto(other *pb.State) error {
copy(s.AppHash[:], other.AppHash)
s.RollappParams = other.RollappParams
s.LastBlockTime = other.LastBlockTime

s.LastSubmittedBlockTime = other.LastSubmittedBlockTime
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions types/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func (s *State) NextHeight() uint64 {
}

func (s *State) GetSkewTime() time.Duration {
if s.LastBlockTime.Before(s.LastSubmittedBlockTime) {
return 0
}
return s.LastBlockTime.Sub(s.LastSubmittedBlockTime)
}

Expand Down

0 comments on commit a48066b

Please sign in to comment.