Skip to content

Commit

Permalink
feat!: block batch submission determined by max bytes (#794)
Browse files Browse the repository at this point in the history
Co-authored-by: danwt <[email protected]>
  • Loading branch information
2 people authored and omritoptix committed May 18, 2024
1 parent 11bbc79 commit 56c0bbd
Show file tree
Hide file tree
Showing 30 changed files with 423 additions and 764 deletions.
66 changes: 20 additions & 46 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"code.cloudfoundry.org/go-diodes"

"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/libp2p/go-libp2p/core/crypto"

Expand Down Expand Up @@ -51,39 +50,22 @@ type Manager struct {
SLClient settlement.LayerI

// Data retrieval
Retriever da.BatchRetriever

Retriever da.BatchRetriever
SyncTargetDiode diodes.Diode
SyncTarget atomic.Uint64

// Block production
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool
producedSizeCh chan uint64 // channel for the producer to report the size of the block it produced

/*
Guard against triggering a new batch submission when the old one is still going on (taking a while)
*/
submitBatchMutex sync.Mutex

/*
Protect against producing two blocks at once if the first one is taking a while
Also, used to protect against the block production that occurs when batch submission thread
creates its empty block.
*/
produceBlockMutex sync.Mutex
// Submitter
AccumulatedBatchSize atomic.Uint64

/*
Protect against processing two blocks at once when there are two routines handling incoming gossiped blocks,
and incoming DA blocks, respectively.
*/
retrieverMutex sync.Mutex

// pendingBatch is the result of the last DA submission
// that is pending settlement layer submission.
// It is used to avoid double submission of the same batch.
// It's protected by submitBatchMutex.
pendingBatch *PendingBatch

logger types.Logger

// Cached blocks and commits for applying at future heights. The blocks may not be valid, because
Expand Down Expand Up @@ -121,23 +103,21 @@ func NewManager(
}

agg := &Manager{
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
LastState: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
// channels are buffered to avoid blocking on input/output operations, buffer sizes are arbitrary
SyncTargetDiode: diodes.NewOneToOne(1, nil),
shouldProduceBlocksCh: make(chan bool, 1),
produceEmptyBlockCh: make(chan bool, 1),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
Pubsub: pubsub,
p2pClient: p2pClient,
ProposerKey: proposerKey,
Conf: conf,
Genesis: genesis,
LastState: s,
Store: store,
Executor: exec,
DAClient: dalc,
SLClient: settlementClient,
Retriever: dalc.(da.BatchRetriever),
SyncTargetDiode: diodes.NewOneToOne(1, nil),
producedSizeCh: make(chan uint64),
logger: logger,
blockCache: make(map[uint64]CachedBlock),
}

return agg, nil
Expand Down Expand Up @@ -181,7 +161,7 @@ func (m *Manager) Start(ctx context.Context) error {
}

if isAggregator {
go uevent.MustSubscribe(ctx, m.Pubsub, "nodeHealth", events.QueryHealthStatus, m.onNodeHealthStatus, m.logger)
// TODO: populate the accumulatedSize on startup
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
Expand Down Expand Up @@ -231,12 +211,6 @@ func getAddress(key crypto.PrivKey) ([]byte, error) {
return tmcrypto.AddressHash(rawKey), nil
}

func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
eventData := event.Data().(*events.DataHealthStatus)
m.logger.Info("Received node health status event.", "eventData", eventData)
m.shouldProduceBlocksCh <- eventData.Error == nil
}

// TODO: move to gossip.go
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
Expand Down
89 changes: 6 additions & 83 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package block_test
import (
"context"
"crypto/rand"
"errors"
"testing"
"time"

Expand All @@ -12,7 +11,6 @@ import (
"github.com/stretchr/testify/require"

"github.com/dymensionxyz/dymint/block"
"github.com/dymensionxyz/dymint/node/events"
"github.com/dymensionxyz/dymint/p2p"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/testutil"
Expand Down Expand Up @@ -183,7 +181,7 @@ func TestProduceNewBlock(t *testing.T) {
manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
// Produce block
err = manager.ProduceAndGossipBlock(context.Background(), true)
_, _, err = manager.ProduceAndGossipBlock(context.Background(), true)
require.NoError(t, err)
// Validate state is updated with the commit hash
assert.Equal(t, uint64(1), manager.Store.Height())
Expand All @@ -210,86 +208,12 @@ func TestProducePendingBlock(t *testing.T) {
_, err = manager.Store.SaveBlock(block, &block.LastCommit, nil)
require.NoError(t, err)
// Produce block
err = manager.ProduceAndGossipBlock(context.Background(), true)
_, _, err = manager.ProduceAndGossipBlock(context.Background(), true)
require.NoError(t, err)
// Validate state is updated with the block that was saved in the store
assert.Equal(t, block.Header.Hash(), *(*[32]byte)(manager.LastState.LastBlockID.Hash))
}

// TestBlockProductionNodeHealth tests the different scenarios of block production when the node health is toggling.
// The test does the following:
// 1. Send healthy event and validate blocks are produced
// 2. Send unhealthy event and validate blocks are not produced
// 3. Send another unhealthy event and validate blocks are still not produced
// 4. Send healthy event and validate blocks are produced
func TestBlockProductionNodeHealth(t *testing.T) {
require := require.New(t)
assert := assert.New(t)
// Setup app
app := testutil.GetAppMock()
// Create proxy app
clientCreator := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(err)
// Init manager
manager, err := testutil.GetManager(testutil.GetManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)

cases := []struct {
name string
healthStatusEvent map[string][]string
healthStatusEventData interface{}
shouldProduceBlocks bool
}{
{
name: "HealthyEventBlocksProduced",
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{},
shouldProduceBlocks: true,
},
{
name: "UnhealthyEventBlocksNotProduced",
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{Error: errors.New("unhealthy")},
shouldProduceBlocks: false,
},
{
name: "UnhealthyEventBlocksStillNotProduced",
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{Error: errors.New("unhealthy")},
shouldProduceBlocks: false,
},
{
name: "HealthyEventBlocksProduced",
healthStatusEvent: events.HealthStatusList,
healthStatusEventData: &events.DataHealthStatus{},
shouldProduceBlocks: true,
},
}
// Start the manager
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = manager.Start(ctx)
require.NoError(err)
time.Sleep(100 * time.Millisecond)

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := manager.Pubsub.PublishWithEvents(context.Background(), c.healthStatusEventData, c.healthStatusEvent)
assert.NoError(err, "PublishWithEvents should not produce an error")
time.Sleep(500 * time.Millisecond)
blockHeight := manager.Store.Height()
time.Sleep(500 * time.Millisecond)
if c.shouldProduceBlocks {
assert.Greater(manager.Store.Height(), blockHeight)
} else {
assert.Equal(blockHeight, manager.Store.Height())
}
})
}
}

// Test that in case we fail after the proxy app commit, next time we won't commit again to the proxy app
// and only update the store height and app hash. This test does the following:
// 1. Produce first block successfully
Expand Down Expand Up @@ -382,7 +306,7 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
})
mockStore.ShouldFailSetHeight = tc.shouldFailSetSetHeight
mockStore.ShoudFailUpdateState = tc.shouldFailUpdateState
_ = manager.ProduceAndGossipBlock(context.Background(), true)
_, _, _ = manager.ProduceAndGossipBlock(context.Background(), true)
require.Equal(tc.expectedStoreHeight, manager.Store.Height(), tc.name)
require.Equal(tc.expectedStateAppHash, manager.LastState.AppHash, tc.name)
storeState, err := manager.Store.LoadState()
Expand All @@ -408,7 +332,6 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
require.NoError(err)
// Init manager
managerConfig := testutil.GetManagerConfig()
managerConfig.BlockBatchSize = 1000
managerConfig.BlockBatchMaxSizeBytes = batchLimitBytes // enough for 2 block, not enough for 10 blocks
manager, err := testutil.GetManager(managerConfig, nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
Expand Down Expand Up @@ -437,14 +360,14 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// Produce blocks
for i := 0; i < tc.blocksToProduce; i++ {
err := manager.ProduceAndGossipBlock(ctx, true)
_, _, err := manager.ProduceAndGossipBlock(ctx, true)
assert.NoError(err)
}

// Call createNextDABatch function
startHeight := manager.SyncTarget.Load() + 1
endHeight := startHeight + uint64(tc.blocksToProduce) - 1
batch, err := manager.CreateNextDABatch(startHeight, endHeight)
batch, err := manager.CreateNextBatchToSubmit(startHeight, endHeight)
assert.NoError(err)

assert.Equal(batch.StartHeight, startHeight)
Expand All @@ -459,7 +382,7 @@ func TestCreateNextDABatchWithBytesLimit(t *testing.T) {
// validate next added block to batch would have been actually too big
// First relax the byte limit so we could proudce larger batch
manager.Conf.BlockBatchMaxSizeBytes = 10 * manager.Conf.BlockBatchMaxSizeBytes
newBatch, err := manager.CreateNextDABatch(startHeight, batch.EndHeight+1)
newBatch, err := manager.CreateNextBatchToSubmit(startHeight, batch.EndHeight+1)
assert.Greater(newBatch.ToProto().Size(), batchLimitBytes)

assert.NoError(err)
Expand Down
66 changes: 31 additions & 35 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,20 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
ticker := time.NewTicker(m.Conf.BlockTime)
defer ticker.Stop()

// Allow the initial block to be empty
produceEmptyBlock := true

var emptyBlocksTimer <-chan time.Time
resetEmptyBlocksTimer := func() {}
// Setup ticker for empty blocks if enabled
if 0 < m.Conf.EmptyBlocksMaxTime {
t := time.NewTimer(m.Conf.EmptyBlocksMaxTime)
emptyBlocksTimer = t.C
resetEmptyBlocksTimer = func() {
produceEmptyBlock = false
t.Reset(m.Conf.EmptyBlocksMaxTime)
}
defer t.Stop()
}
var nextEmptyBlock time.Time
firstBlock := true

for {
select {
case <-ctx.Done(): // Context canceled
case <-ctx.Done():
return
case <-m.produceEmptyBlockCh: // If we got a request for an empty block produce it and don't wait for the ticker
produceEmptyBlock = true
case <-emptyBlocksTimer: // Empty blocks timeout
produceEmptyBlock = true
m.logger.Debug(fmt.Sprintf("no transactions, producing empty block: elapsed: %.2f", m.Conf.EmptyBlocksMaxTime.Seconds()))
// Produce block
case <-ticker.C:
err := m.ProduceAndGossipBlock(ctx, produceEmptyBlock)

// if empty blocks are configured to be enabled, and one is scheduled...
produceEmptyBlock := firstBlock || 0 == m.Conf.MaxIdleTime || nextEmptyBlock.Before(time.Now())
firstBlock = false

block, commit, err := m.ProduceAndGossipBlock(ctx, produceEmptyBlock)
if errors.Is(err, context.Canceled) {
m.logger.Error("produce and gossip: context canceled", "error", err)
return
Expand All @@ -65,33 +51,43 @@ func (m *Manager) ProduceBlockLoop(ctx context.Context) {
m.logger.Error("produce and gossip: uncategorized, assuming recoverable", "error", err)
continue
}
resetEmptyBlocksTimer()
case shouldProduceBlocks := <-m.shouldProduceBlocksCh:
for !shouldProduceBlocks {
m.logger.Info("block production paused - awaiting positive continuation signal")
shouldProduceBlocks = <-m.shouldProduceBlocksCh

// If IBC transactions are present, set proof required to true
// This will set a shorter timer for the next block
// currently we set it for all txs as we don't have a way to determine if an IBC tx is present (https://github.com/dymensionxyz/dymint/issues/709)
nextEmptyBlock = time.Now().Add(m.Conf.MaxIdleTime)
if 0 < len(block.Data.Txs) {
nextEmptyBlock = time.Now().Add(m.Conf.MaxProofTime)
} else {
m.logger.Info("produced empty block")
}

// Send the size to the accumulated size channel
// This will block in case the submitter is too slow and it's buffer is full
size := uint64(block.ToProto().Size()) + uint64(commit.ToProto().Size())
select {
case <-ctx.Done():
return
case m.producedSizeCh <- size:
}
m.logger.Info("resumed block production")
}
}
}

func (m *Manager) ProduceAndGossipBlock(ctx context.Context, allowEmpty bool) error {
func (m *Manager) ProduceAndGossipBlock(ctx context.Context, allowEmpty bool) (*types.Block, *types.Commit, error) {
block, commit, err := m.produceBlock(allowEmpty)
if err != nil {
return fmt.Errorf("produce block: %w", err)
return nil, nil, fmt.Errorf("produce block: %w", err)
}

if err := m.gossipBlock(ctx, *block, *commit); err != nil {
return fmt.Errorf("gossip block: %w", err)
return nil, nil, fmt.Errorf("gossip block: %w", err)
}

return nil
return block, commit, nil
}

func (m *Manager) produceBlock(allowEmpty bool) (*types.Block, *types.Commit, error) {
m.produceBlockMutex.Lock()
defer m.produceBlockMutex.Unlock()
var (
lastCommit *types.Commit
lastHeaderHash [32]byte
Expand Down
Loading

0 comments on commit 56c0bbd

Please sign in to comment.