Skip to content

Commit

Permalink
fix(submission): fix counting and time (#969)
Browse files Browse the repository at this point in the history
Co-authored-by: zale144 <[email protected]>
  • Loading branch information
danwt and zale144 authored Jul 22, 2024
1 parent 0d3be11 commit 242acb7
Show file tree
Hide file tree
Showing 36 changed files with 657 additions and 474 deletions.
37 changes: 13 additions & 24 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ type Manager struct {
DAClient da.DataAvailabilityLayerClient
SLClient settlement.ClientI

/*
Production
*/
producedSizeC chan uint64 // for the producer to report the size of the block+commit it produced

/*
Submission
*/
Expand Down Expand Up @@ -114,7 +109,6 @@ func NewManager(
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
targetSyncHeight: diodes.NewOneToOne(1, nil),
producedSizeC: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
}
Expand Down Expand Up @@ -163,11 +157,16 @@ func (m *Manager) Start(ctx context.Context) error {
if isSequencer {
// Sequencer must wait till DA is synced to start submitting blobs
<-m.DAClient.Synced()
nBytes := m.GetUnsubmittedBytes()
bytesProducedC := make(chan int)
go func() {
bytesProducedC <- nBytes
}()
eg.Go(func() error {
return m.SubmitLoop(ctx)
return m.SubmitLoop(ctx, bytesProducedC)
})
eg.Go(func() error {
return m.ProduceBlockLoop(ctx)
return m.ProduceBlockLoop(ctx, bytesProducedC)
})
} else {
eg.Go(func() error {
Expand All @@ -177,6 +176,12 @@ func (m *Manager) Start(ctx context.Context) error {
return m.SyncToTargetHeightLoop(ctx)
})
}

go func() {
err := eg.Wait()
m.logger.Info("Block manager err group finished.", "err", err)
}()

return nil
}

Expand Down Expand Up @@ -220,19 +225,3 @@ func (m *Manager) syncBlockManager() error {
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
return nil
}

func (m *Manager) MustLoadBlock(h uint64) *types.Block {
ret, err := m.Store.LoadBlock(h)
if err != nil {
panic(fmt.Errorf("store load block: height: %d: %w", h, err))
}
return ret
}

func (m *Manager) MustLoadCommit(h uint64) *types.Commit {
ret, err := m.Store.LoadCommit(h)
if err != nil {
panic(fmt.Errorf("store load commit: height: %d: %w", h, err))
}
return ret
}
30 changes: 15 additions & 15 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.Equal(t, daResultSubmitBatch.Code, da.StatusSuccess)
err = manager.SLClient.SubmitBatch(batch, manager.DAClient.GetClientType(), &daResultSubmitBatch)
require.NoError(t, err)
nextBatchStartHeight = batch.EndHeight + 1
nextBatchStartHeight = batch.EndHeight() + 1
// Wait until daHeight is updated
time.Sleep(time.Millisecond * 500)
}
Expand All @@ -148,9 +148,9 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err)
}()
<-ctx.Done()
assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight.Load())
assert.Equal(t, batch.EndHeight(), manager.LastSubmittedHeight.Load())
// validate that we produced blocks
assert.Greater(t, manager.State.Height(), batch.EndHeight)
assert.Greater(t, manager.State.Height(), batch.EndHeight())
}

func TestRetrieveDaBatchesFailed(t *testing.T) {
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
require.NoError(err)
// Init manager
managerConfig := testutil.GetManagerConfig()
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
managerConfig.BatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

Expand Down Expand Up @@ -354,23 +354,23 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
// Call createNextDABatch function
startHeight := manager.NextHeightToSubmit()
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.CreateNextBatchToSubmit(startHeight, endHeight)
batch, err := manager.CreateBatch(manager.Conf.BatchMaxSizeBytes, startHeight, endHeight)
assert.NoError(err)

assert.Equal(batch.StartHeight, startHeight)
assert.LessOrEqual(batch.ToProto().Size(), int(managerConfig.BlockBatchMaxSizeBytes))
assert.Equal(batch.StartHeight(), startHeight)
assert.LessOrEqual(batch.SizeBytes(), int(managerConfig.BatchMaxSizeBytes))

if !tc.expectedToBeTruncated {
assert.Equal(batch.EndHeight, endHeight)
assert.Equal(batch.EndHeight(), endHeight)
} else {
assert.Equal(batch.EndHeight, batch.StartHeight+uint64(len(batch.Blocks))-1)
assert.Less(batch.EndHeight, endHeight)
assert.Equal(batch.EndHeight(), batch.StartHeight()+batch.NumBlocks()-1)
assert.Less(batch.EndHeight(), endHeight)

// validate next added block to batch would have been actually too big
// First relax the byte limit so we could proudce larger batch
manager.Conf.BlockBatchMaxSizeBytes = 10 * manager.Conf.BlockBatchMaxSizeBytes
newBatch, err := manager.CreateNextBatchToSubmit(startHeight, batch.EndHeight+1)
assert.Greater(newBatch.ToProto().Size(), batchLimitBytes)
// First relax the byte limit so we could produce larger batch
manager.Conf.BatchMaxSizeBytes = 10 * manager.Conf.BatchMaxSizeBytes
newBatch, err := manager.CreateBatch(manager.Conf.BatchMaxSizeBytes, startHeight, batch.EndHeight()+1)
assert.Greater(newBatch.SizeBytes(), batchLimitBytes)

assert.NoError(err)
}
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestDAFetch(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
app.On("Commit", mock.Anything).Return(abci.ResponseCommit{Data: commitHash[:]}).Once()
app.On("Info", mock.Anything).Return(abci.ResponseInfo{
LastBlockHeight: int64(batch.EndHeight),
LastBlockHeight: int64(batch.EndHeight()),
LastBlockAppHash: commitHash[:],
})
err := manager.ProcessNextDABatch(c.daMetaData)
Expand Down
54 changes: 31 additions & 23 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
)

// ProduceBlockLoop is calling publishBlock in a loop as long as we're synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) (err error) {
// A signal will be sent to the bytesProduced channel for each block produced
// In this way it's possible to pause block production by not consuming the channel
func (m *Manager) ProduceBlockLoop(ctx context.Context, bytesProducedC chan int) error {
m.logger.Info("Started block producer loop.")

ticker := time.NewTicker(m.Conf.BlockTime)
Expand All @@ -36,52 +38,58 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) (err error) {
for {
select {
case <-ctx.Done():
return
return nil
case <-ticker.C:
// if empty blocks are configured to be enabled, and one is scheduled...
produceEmptyBlock := firstBlock || 0 == m.Conf.MaxIdleTime || nextEmptyBlock.Before(time.Now())
firstBlock = false

var (
block *types.Block
commit *types.Commit
)
block, commit, err = m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
block, commit, err := m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
if errors.Is(err, context.Canceled) {
m.logger.Error("Produce and gossip: context canceled.", "error", err)
return
return nil
}
if errors.Is(err, types.ErrSkippedEmptyBlock) {
if errors.Is(err, types.ErrEmptyBlock) { // occurs if the block was empty but we don't want to produce one
continue
}
if errors.Is(err, ErrNonRecoverable) {
m.logger.Error("Produce and gossip: non-recoverable.", "error", err) // TODO: flush? or don't log at all?
uevent.MustPublish(ctx, m.Pubsub, &events.DataHealthStatus{Error: err}, events.HealthStatusList)
return
return err
}
if err != nil {
m.logger.Error("Produce and gossip: uncategorized, assuming recoverable.", "error", err)
continue
}

// If IBC transactions are present, set proof required to true
// This will set a shorter timer for the next block
// currently we set it for all txs as we don't have a way to determine if an IBC tx is present (https://github.com/dymensionxyz/dymint/issues/709)
nextEmptyBlock = time.Now().Add(m.Conf.MaxIdleTime)
if 0 < len(block.Data.Txs) {
// the block wasn't empty so we want to make sure we don't wait too long before producing another one, in order to facilitate proofs for ibc
// TODO: optimize to only do this if IBC transactions are present (https://github.com/dymensionxyz/dymint/issues/709)
nextEmptyBlock = time.Now().Add(m.Conf.MaxProofTime)
} else {
m.logger.Info("Produced empty block.")
}

// Send the size to the accumulated size channel
// This will block in case the submitter is too slow and it's buffer is full
size := uint64(block.ToProto().Size()) + uint64(commit.ToProto().Size())
bytesProducedN := block.SizeBytes() + commit.SizeBytes()
select {
case <-ctx.Done():
return
case m.producedSizeC <- size:
return nil
case bytesProducedC <- bytesProducedN:
default:
evt := &events.DataHealthStatus{Error: fmt.Errorf("bytes produced channel is full: %w", gerrc.ErrResourceExhausted)}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Error("Enough bytes to build a batch have been accumulated, but too many batches are pending submission." +
"Pausing block production until a signal is consumed.")
select {
case <-ctx.Done():
return nil
case bytesProducedC <- bytesProducedN:
evt := &events.DataHealthStatus{Error: nil}
uevent.MustPublish(ctx, m.Pubsub, evt, events.HealthStatusList)
m.logger.Info("Resumed block production.")
}
}

}
}
}
Expand Down Expand Up @@ -139,10 +147,10 @@ func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, er
return nil, nil, fmt.Errorf("load block: height: %d: %w: %w", newHeight, err, ErrNonRecoverable)
} else {
// limit to the max block data, so we don't create a block that is too big to fit in a batch
maxBlockDataSize := uint64(float64(m.Conf.BlockBatchMaxSizeBytes) * types.MaxBlockSizeAdjustment)
maxBlockDataSize := uint64(float64(m.Conf.BatchMaxSizeBytes) * types.MaxBlockSizeAdjustment)
block = m.Executor.CreateBlock(newHeight, lastCommit, lastHeaderHash, m.State, maxBlockDataSize)
if !allowEmpty && len(block.Data.Txs) == 0 {
return nil, nil, fmt.Errorf("%w: %w", types.ErrSkippedEmptyBlock, ErrRecoverable)
return nil, nil, fmt.Errorf("%w: %w", types.ErrEmptyBlock, ErrRecoverable)
}

abciHeaderPb := types.ToABCIHeaderPB(&block.Header)
Expand Down Expand Up @@ -201,8 +209,8 @@ func (m *Manager) createTMSignature(block *types.Block, proposerAddress []byte,
v := vote.ToProto()
// convert libp2p key to tm key
// TODO: move to types
raw_key, _ := m.LocalKey.Raw()
tmprivkey := tmed25519.PrivKey(raw_key)
rawKey, _ := m.LocalKey.Raw()
tmprivkey := tmed25519.PrivKey(rawKey)
tmprivkey.PubKey().Bytes()
// Create a mock validator to sign the vote
tmvalidator := tmtypes.NewMockPVWithParams(tmprivkey, false, false)
Expand Down
29 changes: 13 additions & 16 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/dymensionxyz/dymint/mempool"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/node/events"
uchannel "github.com/dymensionxyz/dymint/utils/channel"
uevent "github.com/dymensionxyz/dymint/utils/event"
tmcfg "github.com/tendermint/tendermint/config"

Expand Down Expand Up @@ -57,13 +58,11 @@ func TestCreateEmptyBlocksEnableDisable(t *testing.T) {

mCtx, cancel := context.WithTimeout(context.Background(), runTime)
defer cancel()
go manager.ProduceBlockLoop(mCtx)
go managerWithEmptyBlocks.ProduceBlockLoop(mCtx)

buf1 := make(chan struct{}, 100) // dummy to avoid unhealthy event
buf2 := make(chan struct{}, 100) // dummy to avoid unhealthy event
go manager.AccumulatedDataLoop(mCtx, buf1)
go managerWithEmptyBlocks.AccumulatedDataLoop(mCtx, buf2)
bytesProduced1 := make(chan int)
bytesProduced2 := make(chan int)
go manager.ProduceBlockLoop(mCtx, bytesProduced1)
go managerWithEmptyBlocks.ProduceBlockLoop(mCtx, bytesProduced2)
uchannel.DrainForever(bytesProduced1, bytesProduced2)
<-mCtx.Done()

require.Greater(manager.State.Height(), initialHeight)
Expand Down Expand Up @@ -143,7 +142,9 @@ func TestCreateEmptyBlocksNew(t *testing.T) {

mCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go manager.ProduceBlockLoop(mCtx)
bytesProduced := make(chan int)
go manager.ProduceBlockLoop(mCtx, bytesProduced)
uchannel.DrainForever(bytesProduced)

<-time.Tick(1 * time.Second)
err = mpool.CheckTx([]byte{1, 2, 3, 4}, nil, mempool.TxInfo{})
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestStopBlockProduction(t *testing.T) {
require := require.New(t)

managerConfig := testutil.GetManagerConfig()
managerConfig.BlockBatchMaxSizeBytes = 1000 // small batch size to fill up quickly
managerConfig.BatchMaxSizeBytes = 1000 // small batch size to fill up quickly
manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, nil, nil)
require.NoError(err)

Expand All @@ -201,14 +202,10 @@ func TestStopBlockProduction(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

go func() {
manager.ProduceBlockLoop(ctx)
wg.Done() // Decrease counter when this goroutine finishes
}()
bytesProducedC := make(chan int)

toSubmit := make(chan struct{})
go func() {
manager.AccumulatedDataLoop(ctx, toSubmit)
manager.ProduceBlockLoop(ctx, bytesProducedC)
wg.Done() // Decrease counter when this goroutine finishes
}()

Expand All @@ -232,7 +229,7 @@ func TestStopBlockProduction(t *testing.T) {
assert.Equal(stoppedHeight, manager.State.Height())

// consume the signal
<-toSubmit
<-bytesProducedC

// check for health status event and block production to continue
select {
Expand Down
2 changes: 1 addition & 1 deletion block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (e *Executor) UpdateMempoolAfterInitChain(s *types.State) {
e.mempool.SetPostCheckFn(mempool.PostCheckMaxGas(s.ConsensusParams.Block.MaxGas))
}

// Update state from Commit response
// UpdateStateAfterCommit using commit response
func (e *Executor) UpdateStateAfterCommit(s *types.State, resp *tmstate.ABCIResponses, appHash []byte, height uint64, valSet *tmtypes.ValidatorSet) {
copy(s.AppHash[:], appHash[:])
copy(s.LastResultsHash[:], tmtypes.NewResults(resp.DeliverTxs).Hash())
Expand Down
Loading

0 comments on commit 242acb7

Please sign in to comment.