Skip to content

Commit

Permalink
store last settlement block time on store
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Nov 4, 2024
1 parent 8d47283 commit 120347e
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 149 deletions.
6 changes: 6 additions & 0 deletions block/initchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package block
import (
"context"
"errors"
"time"

tmtypes "github.com/tendermint/tendermint/types"
)
Expand All @@ -24,5 +25,10 @@ func (m *Manager) RunInitChain(ctx context.Context) error {
if _, err := m.Store.SaveState(m.State, nil); err != nil {
return err
}

err = m.SetLastSettlementBlockTime(time.Now())
if err != nil {
return err
}
return nil
}
28 changes: 27 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/dymensionxyz/gerr-cosmos/gerrc"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -274,7 +275,6 @@ func (m *Manager) updateFromLastSettlementState() error {
return err
}

// update last block submitted time from last batch when starting the node
m.LastSettlementHeight.Store(latestHeight)

if latestHeight >= m.State.NextHeight() {
Expand Down Expand Up @@ -356,3 +356,29 @@ func (m *Manager) setDA(daconfig string, dalcKV store.KV, logger log.Logger) err
func (m *Manager) setFraudHandler(handler *FreezeHandler) {
m.FraudHandler = handler
}

// SetLastSettlementBlockTime saves the last block in SL timestamp
func (m *Manager) SetLastSettlementBlockTime(time time.Time) error {
_, err := m.Store.SaveLastSettlementBlockTime(uint64(time.UTC().UnixNano()), nil)
if err != nil {
return err
}
return nil
}

// GetLastSettlementBlockTime returns the last block in SL timestamp
func (m *Manager) GetLastSettlementBlockTime() (time.Time, error) {
lastSettlementBlockTime, err := m.Store.LoadLastSettlementBlockTime()
if err != nil {
return time.Time{}, err
}
return time.Unix(0, int64(lastSettlementBlockTime)), nil
}

func (m *Manager) GetSkewTime() (time.Duration, error) {
lastSettlementBlockTime, err := m.GetLastSettlementBlockTime()
if err != nil {
return time.Duration(0), err
}
return m.State.GetLastBlockTime().Sub(lastSettlementBlockTime), nil
}
4 changes: 0 additions & 4 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"errors"
"fmt"
"time"

errorsmod "cosmossdk.io/errors"

Expand Down Expand Up @@ -111,9 +110,6 @@ func (e *Executor) UpdateStateAfterInitChain(s *types.State, res *abci.ResponseI
}
// We update the last results hash with the empty hash, to conform with RFC-6962.
copy(s.LastResultsHash[:], merkle.HashFromByteSlices(nil))

// we init the last submitted block time, to be able go calculate max skew time before first batch is submitted
s.SetLastSubmittedBlockTime(time.Now())
}

func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) {
Expand Down
27 changes: 19 additions & 8 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (m *Manager) SubmitLoop(ctx context.Context,
bytesProduced,
m.Conf.BatchSkew,
m.GetUnsubmittedBlocks,
m.State.GetSkewTime,
m.GetSkewTime,
m.Conf.BatchSubmitTime,
m.Conf.BatchSubmitBytes,
m.CreateAndSubmitBatchGetSizeBlocksCommits,
Expand All @@ -46,7 +46,7 @@ func SubmitLoopInner(
bytesProduced chan int, // a channel of block and commit bytes produced
maxBatchSkew time.Duration, // max number of blocks that submitter is allowed to have pending
unsubmittedBlocksNum func() uint64,
skewTime func() time.Duration,
skewTime func() (time.Duration, error),
maxBatchTime time.Duration, // max time to allow between batches
maxBatchBytes uint64, // max size of serialised batch in bytes
createAndSubmitBatch func(maxSizeBytes uint64) (sizeBlocksCommits uint64, err error),
Expand All @@ -72,11 +72,16 @@ func SubmitLoopInner(

types.RollappPendingSubmissionsSkewBytes.Set(float64(pendingBytes.Load()))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))

skewTime, err := skewTime()
if err != nil {
return err
}
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))

submitter.Nudge()

if maxBatchSkew < skewTime() {
if maxBatchSkew < skewTime {
// too much stuff is pending submission
// we block here until we get a progress nudge from the submitter thread
select {
Expand All @@ -101,16 +106,20 @@ func SubmitLoopInner(
}
pending := pendingBytes.Load()

skewTime, err := skewTime()
if err != nil {
return err
}
types.RollappPendingSubmissionsSkewBytes.Set(float64(pending))
types.RollappPendingSubmissionsSkewBlocks.Set(float64(unsubmittedBlocksNum()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime().Hours()))
types.RollappPendingSubmissionsSkewTimeHours.Set(float64(skewTime.Hours()))

// while there are accumulated blocks, create and submit batches!!
for {
done := ctx.Err() != nil
nothingToSubmit := pending == 0

lastSubmissionIsRecent := skewTime() < maxBatchTime
lastSubmissionIsRecent := skewTime < maxBatchTime
maxDataNotExceeded := pending <= maxBatchBytes
if done || nothingToSubmit || (lastSubmissionIsRecent && maxDataNotExceeded) {
break
Expand Down Expand Up @@ -248,9 +257,11 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {

types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSettlementHeight.Store(batch.EndHeight())

// update last submitted block time with batch last block (used to calculate max skew time)
m.State.SetLastSubmittedBlockTime(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp())
return nil
err = m.SetLastSettlementBlockTime(batch.Blocks[len(batch.Blocks)-1].Header.GetTimestamp())

return err
}

// GetUnsubmittedBytes returns the total number of unsubmitted bytes produced an element on a channel
Expand Down
12 changes: 7 additions & 5 deletions block/submit_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func testSubmitLoopInner(
lastProducedBlockTime.Store(uint64(time.Now().UTC().UnixNano()))
lastSubmittedBlockTime.Store(uint64(time.Now().UTC().UnixNano()))

skewTime := func() time.Duration {
skewTime := func() (time.Duration, error) {

lastSubmitted := time.Unix(0, int64(lastSubmittedBlockTime.Load()))
lastProduced := time.Unix(0, int64(lastProducedBlockTime.Load()))
if lastProduced.Before(lastSubmitted) {
return 0
return 0, nil
}
return lastProduced.Sub(lastSubmitted)
return lastProduced.Sub(lastSubmitted), nil
}
go func() { // simulate block production
go func() { // another thread to check system properties
Expand All @@ -83,7 +83,8 @@ func testSubmitLoopInner(
default:
}
// producer shall not get too far ahead
require.True(t, skewTime() < args.batchSkew+args.skewMargin, "last produced blocks time not less than maximum skew time", "produced block skew time", skewTime(), "max skew time", args.batchSkew)
skewTime, _ := skewTime()
require.True(t, skewTime < args.batchSkew+args.skewMargin, "last produced blocks time not less than maximum skew time", "produced block skew time", skewTime, "max skew time", args.batchSkew)
}
}()
for {
Expand All @@ -95,7 +96,8 @@ func testSubmitLoopInner(

time.Sleep(approx(args.produceTime))

if args.batchSkew <= skewTime() {
skewTime, _ := skewTime()
if args.batchSkew <= skewTime {
continue
}
nBytes := rand.Intn(args.produceBytes) // simulate block production
Expand Down
4 changes: 2 additions & 2 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestSubmissionByTime(t *testing.T) {
manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)

manager.State.SetLastSubmittedBlockTime(time.Now())
manager.SetLastSettlementBlockTime(time.Now())
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
Expand Down Expand Up @@ -332,7 +332,7 @@ func TestSubmissionByBatchSize(t *testing.T) {
managerConfig.BatchSubmitBytes = c.blockBatchMaxSizeBytes
manager, err := testutil.GetManager(managerConfig, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
manager.State.SetLastSubmittedBlockTime(time.Now())
manager.SetLastSettlementBlockTime(time.Now())

manager.DAClient = testutil.GetMockDALC(log.TestingLogger())
manager.Retriever = manager.DAClient.(da.BatchRetriever)
Expand Down
3 changes: 3 additions & 0 deletions block/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (m *Manager) SettlementSyncLoop(ctx context.Context) error {
}
m.logger.Info("Retrieved state update from SL.", "state_index", settlementBatch.StateIndex)

lastBlockTimestamp := settlementBatch.BlockDescriptors[len(settlementBatch.BlockDescriptors)-1].GetTimestamp()
m.SetLastSettlementBlockTime(lastBlockTimestamp)

err = m.ApplyBatchFromSL(settlementBatch.Batch)
if err != nil {
m.logger.Error("process next DA batch", "err", err)
Expand Down
3 changes: 0 additions & 3 deletions block/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ func (m *Manager) SettlementValidateLoop(ctx context.Context) error {
return err
}

lastBlockTimestamp := batch.BlockDescriptors[len(batch.BlockDescriptors)-1].GetTimestamp()
m.State.SetLastSubmittedBlockTime(lastBlockTimestamp)

// validate batch
err = m.SettlementValidator.ValidateStateUpdate(batch)
if err != nil {
Expand Down
3 changes: 0 additions & 3 deletions proto/types/dymint/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ message State {

// Proposer is a sequencer that acts as a proposer. Can be nil if no proposer is set.
Sequencer proposer = 20;

uint64 last_submitted_block_time = 21;

}

//rollapp params defined in genesis and updated via gov proposal
Expand Down
43 changes: 33 additions & 10 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,17 @@ import (
)

var (
blockPrefix = [1]byte{1}
indexPrefix = [1]byte{2}
commitPrefix = [1]byte{3}
statePrefix = [1]byte{4}
responsesPrefix = [1]byte{5}
proposerPrefix = [1]byte{6}
cidPrefix = [1]byte{7}
sourcePrefix = [1]byte{8}
validatedHeightPrefix = [1]byte{9}
drsVersionPrefix = [1]byte{10}
blockPrefix = [1]byte{1}
indexPrefix = [1]byte{2}
commitPrefix = [1]byte{3}
statePrefix = [1]byte{4}
responsesPrefix = [1]byte{5}
proposerPrefix = [1]byte{6}
cidPrefix = [1]byte{7}
sourcePrefix = [1]byte{8}
validatedHeightPrefix = [1]byte{9}
drsVersionPrefix = [1]byte{10}
lastSettlementBlockTimePrefix = [1]byte{11}
)

// DefaultStore is a default store implementation.
Expand Down Expand Up @@ -335,6 +336,24 @@ func (s *DefaultStore) SaveDRSVersion(height uint64, version uint32, batch KVBat
return batch, err
}

func (s *DefaultStore) SaveLastSettlementBlockTime(time uint64, batch KVBatch) (KVBatch, error) {
b := make([]byte, 8)
binary.LittleEndian.PutUint64(b, time)
if batch == nil {
return nil, s.db.Set(getLastSettlementBlockTimeKey(), b)
}
err := batch.Set(getLastSettlementBlockTimeKey(), b)
return batch, err
}

func (s *DefaultStore) LoadLastSettlementBlockTime() (uint64, error) {
b, err := s.db.Get(getLastSettlementBlockTimeKey())
if err != nil {
return 0, err
}
return binary.LittleEndian.Uint64(b), nil
}

func getBlockKey(hash [32]byte) []byte {
return append(blockPrefix[:], hash[:]...)
}
Expand Down Expand Up @@ -386,3 +405,7 @@ func getProposerKey(height uint64) []byte {
binary.BigEndian.PutUint64(buf, height)
return append(proposerPrefix[:], buf[:]...)
}

func getLastSettlementBlockTimeKey() []byte {
return lastSettlementBlockTimePrefix[:]
}
4 changes: 4 additions & 0 deletions store/storeIface.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,9 @@ type Store interface {

SaveDRSVersion(height uint64, version uint32, batch KVBatch) (KVBatch, error)

LoadLastSettlementBlockTime() (uint64, error)

SaveLastSettlementBlockTime(height uint64, batch KVBatch) (KVBatch, error)

Close() error
}
Loading

0 comments on commit 120347e

Please sign in to comment.