Skip to content

Commit

Permalink
fix: submit batch keep retrying even though batch accepted event rece…
Browse files Browse the repository at this point in the history
…ived (#253)
  • Loading branch information
omritoptix authored Feb 17, 2023
1 parent d93b63e commit e6f489c
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 22 deletions.
38 changes: 29 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ type Manager struct {

syncTargetDiode diodes.Diode

batchInProcess atomic.Value
batchInProcess atomic.Value
batchRetryCtx context.Context
batchRetryCancel context.CancelFunc
batchRetryMu sync.RWMutex

syncTarget uint64
isSyncedCond sync.Cond
Expand Down Expand Up @@ -199,7 +202,7 @@ func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultR

}

// waitForSync enforaces the aggregator to be synced before it can produce blocks.
// waitForSync enforces the aggregator to be synced before it can produce blocks.
// It requires the retriveBlockLoop to be running.
func (m *Manager) waitForSync(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
Expand Down Expand Up @@ -299,7 +302,9 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) {
// the next batches to be published. For non-aggregators this is not needed.
// We only want to send the next once the previous has been published successfully.
// TODO(omritoptix): Once we have leader election, we can add a condition.
m.batchInProcess.Store(false)
// Update batch accepted is only relevant for the aggregator
// TODO(omritoptix): Check if we are the aggregator
m.updateBatchAccepted()
case <-subscription.Cancelled():
m.logger.Info("Subscription canceled")
}
Expand All @@ -313,6 +318,15 @@ func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) {
m.syncTargetDiode.Set(diodes.GenericDataType(&endHeight))
}

func (m *Manager) updateBatchAccepted() {
m.batchRetryMu.Lock()
if m.batchRetryCtx != nil && m.batchRetryCtx.Err() == nil {
m.batchRetryCancel()
}
m.batchRetryMu.Unlock()
m.batchInProcess.Store(false)
}

// RetriveLoop listens for new sync messages written to a ring buffer and in turn
// runs syncUntilTarget on the latest message in the ring buffer.
func (m *Manager) RetriveLoop(ctx context.Context) {
Expand Down Expand Up @@ -659,7 +673,7 @@ func (m *Manager) submitNextBatch(ctx context.Context) {
// Submit batch to SL
// TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL.
// In that case we'll want to update the syncTarget before returning.
m.submitBatchToSL(ctx, nextBatch, resultSubmitToDA)
m.submitBatchToSL(nextBatch, resultSubmitToDA)
}

func (m *Manager) updateStateIndex(stateIndex uint64) error {
Expand Down Expand Up @@ -698,8 +712,12 @@ func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*type
return batch, nil
}

func (m *Manager) submitBatchToSL(ctx context.Context, batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) *settlement.ResultSubmitBatch {
func (m *Manager) submitBatchToSL(batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) {
var resultSubmitToSL *settlement.ResultSubmitBatch
m.batchRetryMu.Lock()
m.batchRetryCtx, m.batchRetryCancel = context.WithCancel(context.Background())
m.batchRetryMu.Unlock()
defer m.batchRetryCancel()
// Submit batch to SL
err := retry.Do(func() error {
resultSubmitToSL = m.settlementClient.SubmitBatch(batch, m.dalc.GetClientType(), resultSubmitToDA)
Expand All @@ -708,12 +726,14 @@ func (m *Manager) submitBatchToSL(ctx context.Context, batch *types.Batch, resul
return err
}
return nil
}, retry.Context(ctx), retry.LastErrorOnly(true), retry.Delay(SLBatchRetryDelay))
if err != nil {
m.logger.Error("Failed to submit batch to SL Layer", batch, err)
}, retry.Context(m.batchRetryCtx), retry.LastErrorOnly(true), retry.Delay(SLBatchRetryDelay))
// Panic if we failed not due to context cancellation
m.batchRetryMu.Lock()
if err != nil && m.batchRetryCtx.Err() == nil {
m.logger.Error("Failed to submit batch to SL Layer", "startHeight", batch.StartHeight, "EndHeight", batch.EndHeight, "error", err)
panic(err)
}
return resultSubmitToSL
m.batchRetryMu.Unlock()
}

func (m *Manager) submitBatchToDA(ctx context.Context, batch *types.Batch) (*da.ResultSubmitBatch, error) {
Expand Down
81 changes: 68 additions & 13 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestInitialState(t *testing.T) {
// 2. Sync the manager
// 3. Succeed to produce blocks
func TestProduceOnlyAfterSynced(t *testing.T) {
manager, err := getManager(nil, nil, 1, 1, 0, nil, nil)
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, nil, nil)
require.NoError(t, err)
require.NotNil(t, manager)

Expand Down Expand Up @@ -171,7 +171,9 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}

func TestPublishWhenSettlementLayerDisconnected(t *testing.T) {
manager, err := getManager(&SettlementLayerClientSubmitBatchError{}, nil, 1, 1, 0, nil, nil)
isSettlementError := atomic.Value{}
isSettlementError.Store(true)
manager, err := getManager(getManagerConfig(), &SettlementLayerClientSubmitBatchError{isError: isSettlementError}, nil, 1, 1, 0, nil, nil)
retry.DefaultAttempts = 2
require.NoError(t, err)
require.NotNil(t, manager)
Expand All @@ -188,11 +190,11 @@ func TestPublishWhenSettlementLayerDisconnected(t *testing.T) {
err := recover().(error)
assert.ErrorContains(t, err, connectionRefusedErrorMessage)
}()
manager.submitBatchToSL(context.Background(), nil, nil)
manager.submitBatchToSL(&types.Batch{StartHeight: 1, EndHeight: 1}, nil)
}

func TestPublishWhenDALayerDisconnected(t *testing.T) {
manager, err := getManager(nil, &DALayerClientSubmitBatchError{}, 1, 1, 0, nil, nil)
manager, err := getManager(getManagerConfig(), nil, &DALayerClientSubmitBatchError{}, 1, 1, 0, nil, nil)
retry.DefaultAttempts = 2
require.NoError(t, err)
require.NotNil(t, manager)
Expand All @@ -208,7 +210,7 @@ func TestPublishWhenDALayerDisconnected(t *testing.T) {
}

func TestRetrieveDaBatchesFailed(t *testing.T) {
manager, err := getManager(nil, &DALayerClientRetrieveBatchesError{}, 1, 1, 0, nil, nil)
manager, err := getManager(getManagerConfig(), nil, &DALayerClientRetrieveBatchesError{}, 1, 1, 0, nil, nil)
require.NoError(t, err)
require.NotNil(t, manager)

Expand All @@ -227,7 +229,7 @@ func TestProduceNewBlock(t *testing.T) {
err := proxyApp.Start()
require.NoError(t, err)
// Init manager
manager, err := getManager(nil, nil, 1, 1, 0, proxyApp, nil)
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
// Produce block
err = manager.produceBlock(context.Background())
Expand All @@ -248,7 +250,7 @@ func TestProducePendingBlock(t *testing.T) {
err := proxyApp.Start()
require.NoError(t, err)
// Init manager
manager, err := getManager(nil, nil, 1, 1, 0, proxyApp, nil)
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil)
require.NoError(t, err)
// Generate block and commit and save it to the store
blocks, err := testutil.GenerateBlocks(1, 1, manager.proposerKey)
Expand Down Expand Up @@ -283,7 +285,7 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
// Create a new mock store which should succeed to save the first block
mockStore := newMockStore()
// Init manager
manager, err := getManager(nil, nil, 1, 1, 0, proxyApp, mockStore)
manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, mockStore)
require.NoError(err)

cases := []struct {
Expand Down Expand Up @@ -364,11 +366,58 @@ func TestProduceBlockFailAfterCommit(t *testing.T) {
}
}

// Test batch retry halts upon new batch acceptance
// 1. Produce blocks with settlement layer batch submission error
// 2. Emit an event that a new batch was accepted
// 3. Validate new batches was submitted
func TestBatchRetryWhileBatchAccepted(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
app := testutil.GetAppMock()
// Create proxy app
clientCreator := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(clientCreator)
err := proxyApp.Start()
require.NoError(err)
// Init manager
managerConfig := getManagerConfig()
managerConfig.BlockBatchSize = 1
isSettlementError := atomic.Value{}
isSettlementError.Store(true)
settlementLayerClient := &SettlementLayerClientSubmitBatchError{isError: isSettlementError}
manager, err := getManager(managerConfig, settlementLayerClient, nil, 1, 1, 0, proxyApp, nil)
require.NoError(err)
// Produce blocks with settlement layer batch submission error
blockLoopContext, blockLoopCancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.Background())
defer blockLoopCancel()
defer cancel()
go manager.ProduceBlockLoop(blockLoopContext)
go manager.SyncTargetLoop(ctx)
time.Sleep(200 * time.Millisecond)
assert.Equal(uint64(0), atomic.LoadUint64(&settlementLayerClient.batchCounter))
// Cancel block production to not interfere with the isBatchInProcess flag
blockLoopCancel()
time.Sleep(100 * time.Millisecond)
// Emit an event that a new batch was accepted and wait for it to be processed
eventData := &settlement.EventDataNewSettlementBatchAccepted{EndHeight: 1, StateIndex: 1}
manager.pubsub.PublishWithEvents(ctx, eventData, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}})
time.Sleep(200 * time.Millisecond)
// Change settlement layer to accept batches and validate new batches was submitted
settlementLayerClient.isError.Store(false)
blockLoopContext, blockLoopCancel = context.WithCancel(context.Background())
defer blockLoopCancel()
go manager.ProduceBlockLoop(blockLoopContext)
time.Sleep(1 * time.Second)
assert.Greater(atomic.LoadUint64(&settlementLayerClient.batchCounter), uint64(0))

}

/* -------------------------------------------------------------------------- */
/* utils */
/* -------------------------------------------------------------------------- */

func getManager(settlementlc settlement.LayerClient, dalc da.DataAvailabilityLayerClient, genesisHeight int64, storeInitialHeight int64, storeLastBlockHeight int64, proxyAppConns proxy.AppConns, mockStore store.Store) (*Manager, error) {
func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerClient, dalc da.DataAvailabilityLayerClient, genesisHeight int64, storeInitialHeight int64, storeLastBlockHeight int64, proxyAppConns proxy.AppConns, mockStore store.Store) (*Manager, error) {
genesis := testutil.GenerateGenesis(genesisHeight)
// Change the LastBlockHeight to avoid calling InitChainSync within the manager
// And updating the state according to the genesis.
Expand All @@ -382,7 +431,7 @@ func getManager(settlementlc settlement.LayerClient, dalc da.DataAvailabilityLay
if _, err := managerStore.UpdateState(state, nil); err != nil {
return nil, err
}
conf := getManagerConfig()

logger := log.TestingLogger()
pubsubServer := pubsub.NewServer()
pubsubServer.Start()
Expand Down Expand Up @@ -532,13 +581,19 @@ func newMockStore() *MockStore {
}

type SettlementLayerClientSubmitBatchError struct {
isError atomic.Value
batchCounter uint64
slmock.SettlementLayerClient
}

func (s *SettlementLayerClientSubmitBatchError) SubmitBatch(_ *types.Batch, _ da.Client, _ *da.ResultSubmitBatch) *settlement.ResultSubmitBatch {
return &settlement.ResultSubmitBatch{
BaseResult: settlement.BaseResult{Code: settlement.StatusError, Message: connectionRefusedErrorMessage},
func (s *SettlementLayerClientSubmitBatchError) SubmitBatch(batch *types.Batch, daClient da.Client, daResultSubmitBatch *da.ResultSubmitBatch) *settlement.ResultSubmitBatch {
if s.isError.Load() == true {
return &settlement.ResultSubmitBatch{
BaseResult: settlement.BaseResult{Code: settlement.StatusError, Message: connectionRefusedErrorMessage},
}
}
atomic.AddUint64(&s.batchCounter, 1)
return &settlement.ResultSubmitBatch{BaseResult: settlement.BaseResult{Code: settlement.StatusSuccess}}
}

type DALayerClientSubmitBatchError struct {
Expand Down

0 comments on commit e6f489c

Please sign in to comment.