diff --git a/block/manager.go b/block/manager.go index 312cf3ad6..0285f4dd0 100644 --- a/block/manager.go +++ b/block/manager.go @@ -72,7 +72,10 @@ type Manager struct { syncTargetDiode diodes.Diode - batchInProcess atomic.Value + batchInProcess atomic.Value + batchRetryCtx context.Context + batchRetryCancel context.CancelFunc + batchRetryMu sync.RWMutex syncTarget uint64 isSyncedCond sync.Cond @@ -199,7 +202,7 @@ func (m *Manager) getLatestBatchFromSL(ctx context.Context) (*settlement.ResultR } -// waitForSync enforaces the aggregator to be synced before it can produce blocks. +// waitForSync enforces the aggregator to be synced before it can produce blocks. // It requires the retriveBlockLoop to be running. func (m *Manager) waitForSync(ctx context.Context) error { resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx) @@ -299,7 +302,9 @@ func (m *Manager) SyncTargetLoop(ctx context.Context) { // the next batches to be published. For non-aggregators this is not needed. // We only want to send the next once the previous has been published successfully. // TODO(omritoptix): Once we have leader election, we can add a condition. - m.batchInProcess.Store(false) + // Update batch accepted is only relevant for the aggregator + // TODO(omritoptix): Check if we are the aggregator + m.updateBatchAccepted() case <-subscription.Cancelled(): m.logger.Info("Subscription canceled") } @@ -313,6 +318,15 @@ func (m *Manager) updateSyncParams(ctx context.Context, endHeight uint64) { m.syncTargetDiode.Set(diodes.GenericDataType(&endHeight)) } +func (m *Manager) updateBatchAccepted() { + m.batchRetryMu.Lock() + if m.batchRetryCtx != nil && m.batchRetryCtx.Err() == nil { + m.batchRetryCancel() + } + m.batchRetryMu.Unlock() + m.batchInProcess.Store(false) +} + // RetriveLoop listens for new sync messages written to a ring buffer and in turn // runs syncUntilTarget on the latest message in the ring buffer. func (m *Manager) RetriveLoop(ctx context.Context) { @@ -659,7 +673,7 @@ func (m *Manager) submitNextBatch(ctx context.Context) { // Submit batch to SL // TODO(omritoptix): Handle a case where the SL submission fails due to syncTarget out of sync with the latestHeight in the SL. // In that case we'll want to update the syncTarget before returning. - m.submitBatchToSL(ctx, nextBatch, resultSubmitToDA) + m.submitBatchToSL(nextBatch, resultSubmitToDA) } func (m *Manager) updateStateIndex(stateIndex uint64) error { @@ -698,8 +712,12 @@ func (m *Manager) createNextDABatch(startHeight uint64, endHeight uint64) (*type return batch, nil } -func (m *Manager) submitBatchToSL(ctx context.Context, batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) *settlement.ResultSubmitBatch { +func (m *Manager) submitBatchToSL(batch *types.Batch, resultSubmitToDA *da.ResultSubmitBatch) { var resultSubmitToSL *settlement.ResultSubmitBatch + m.batchRetryMu.Lock() + m.batchRetryCtx, m.batchRetryCancel = context.WithCancel(context.Background()) + m.batchRetryMu.Unlock() + defer m.batchRetryCancel() // Submit batch to SL err := retry.Do(func() error { resultSubmitToSL = m.settlementClient.SubmitBatch(batch, m.dalc.GetClientType(), resultSubmitToDA) @@ -708,12 +726,14 @@ func (m *Manager) submitBatchToSL(ctx context.Context, batch *types.Batch, resul return err } return nil - }, retry.Context(ctx), retry.LastErrorOnly(true), retry.Delay(SLBatchRetryDelay)) - if err != nil { - m.logger.Error("Failed to submit batch to SL Layer", batch, err) + }, retry.Context(m.batchRetryCtx), retry.LastErrorOnly(true), retry.Delay(SLBatchRetryDelay)) + // Panic if we failed not due to context cancellation + m.batchRetryMu.Lock() + if err != nil && m.batchRetryCtx.Err() == nil { + m.logger.Error("Failed to submit batch to SL Layer", "startHeight", batch.StartHeight, "EndHeight", batch.EndHeight, "error", err) panic(err) } - return resultSubmitToSL + m.batchRetryMu.Unlock() } func (m *Manager) submitBatchToDA(ctx context.Context, batch *types.Batch) (*da.ResultSubmitBatch, error) { diff --git a/block/manager_test.go b/block/manager_test.go index 069d7d93e..fbada7d1c 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -119,7 +119,7 @@ func TestInitialState(t *testing.T) { // 2. Sync the manager // 3. Succeed to produce blocks func TestProduceOnlyAfterSynced(t *testing.T) { - manager, err := getManager(nil, nil, 1, 1, 0, nil, nil) + manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, nil, nil) require.NoError(t, err) require.NotNil(t, manager) @@ -171,7 +171,9 @@ func TestProduceOnlyAfterSynced(t *testing.T) { } func TestPublishWhenSettlementLayerDisconnected(t *testing.T) { - manager, err := getManager(&SettlementLayerClientSubmitBatchError{}, nil, 1, 1, 0, nil, nil) + isSettlementError := atomic.Value{} + isSettlementError.Store(true) + manager, err := getManager(getManagerConfig(), &SettlementLayerClientSubmitBatchError{isError: isSettlementError}, nil, 1, 1, 0, nil, nil) retry.DefaultAttempts = 2 require.NoError(t, err) require.NotNil(t, manager) @@ -188,11 +190,11 @@ func TestPublishWhenSettlementLayerDisconnected(t *testing.T) { err := recover().(error) assert.ErrorContains(t, err, connectionRefusedErrorMessage) }() - manager.submitBatchToSL(context.Background(), nil, nil) + manager.submitBatchToSL(&types.Batch{StartHeight: 1, EndHeight: 1}, nil) } func TestPublishWhenDALayerDisconnected(t *testing.T) { - manager, err := getManager(nil, &DALayerClientSubmitBatchError{}, 1, 1, 0, nil, nil) + manager, err := getManager(getManagerConfig(), nil, &DALayerClientSubmitBatchError{}, 1, 1, 0, nil, nil) retry.DefaultAttempts = 2 require.NoError(t, err) require.NotNil(t, manager) @@ -208,7 +210,7 @@ func TestPublishWhenDALayerDisconnected(t *testing.T) { } func TestRetrieveDaBatchesFailed(t *testing.T) { - manager, err := getManager(nil, &DALayerClientRetrieveBatchesError{}, 1, 1, 0, nil, nil) + manager, err := getManager(getManagerConfig(), nil, &DALayerClientRetrieveBatchesError{}, 1, 1, 0, nil, nil) require.NoError(t, err) require.NotNil(t, manager) @@ -227,7 +229,7 @@ func TestProduceNewBlock(t *testing.T) { err := proxyApp.Start() require.NoError(t, err) // Init manager - manager, err := getManager(nil, nil, 1, 1, 0, proxyApp, nil) + manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil) require.NoError(t, err) // Produce block err = manager.produceBlock(context.Background()) @@ -248,7 +250,7 @@ func TestProducePendingBlock(t *testing.T) { err := proxyApp.Start() require.NoError(t, err) // Init manager - manager, err := getManager(nil, nil, 1, 1, 0, proxyApp, nil) + manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, nil) require.NoError(t, err) // Generate block and commit and save it to the store blocks, err := testutil.GenerateBlocks(1, 1, manager.proposerKey) @@ -283,7 +285,7 @@ func TestProduceBlockFailAfterCommit(t *testing.T) { // Create a new mock store which should succeed to save the first block mockStore := newMockStore() // Init manager - manager, err := getManager(nil, nil, 1, 1, 0, proxyApp, mockStore) + manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, proxyApp, mockStore) require.NoError(err) cases := []struct { @@ -364,11 +366,58 @@ func TestProduceBlockFailAfterCommit(t *testing.T) { } } +// Test batch retry halts upon new batch acceptance +// 1. Produce blocks with settlement layer batch submission error +// 2. Emit an event that a new batch was accepted +// 3. Validate new batches was submitted +func TestBatchRetryWhileBatchAccepted(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + app := testutil.GetAppMock() + // Create proxy app + clientCreator := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(clientCreator) + err := proxyApp.Start() + require.NoError(err) + // Init manager + managerConfig := getManagerConfig() + managerConfig.BlockBatchSize = 1 + isSettlementError := atomic.Value{} + isSettlementError.Store(true) + settlementLayerClient := &SettlementLayerClientSubmitBatchError{isError: isSettlementError} + manager, err := getManager(managerConfig, settlementLayerClient, nil, 1, 1, 0, proxyApp, nil) + require.NoError(err) + // Produce blocks with settlement layer batch submission error + blockLoopContext, blockLoopCancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) + defer blockLoopCancel() + defer cancel() + go manager.ProduceBlockLoop(blockLoopContext) + go manager.SyncTargetLoop(ctx) + time.Sleep(200 * time.Millisecond) + assert.Equal(uint64(0), atomic.LoadUint64(&settlementLayerClient.batchCounter)) + // Cancel block production to not interfere with the isBatchInProcess flag + blockLoopCancel() + time.Sleep(100 * time.Millisecond) + // Emit an event that a new batch was accepted and wait for it to be processed + eventData := &settlement.EventDataNewSettlementBatchAccepted{EndHeight: 1, StateIndex: 1} + manager.pubsub.PublishWithEvents(ctx, eventData, map[string][]string{settlement.EventTypeKey: {settlement.EventNewSettlementBatchAccepted}}) + time.Sleep(200 * time.Millisecond) + // Change settlement layer to accept batches and validate new batches was submitted + settlementLayerClient.isError.Store(false) + blockLoopContext, blockLoopCancel = context.WithCancel(context.Background()) + defer blockLoopCancel() + go manager.ProduceBlockLoop(blockLoopContext) + time.Sleep(1 * time.Second) + assert.Greater(atomic.LoadUint64(&settlementLayerClient.batchCounter), uint64(0)) + +} + /* -------------------------------------------------------------------------- */ /* utils */ /* -------------------------------------------------------------------------- */ -func getManager(settlementlc settlement.LayerClient, dalc da.DataAvailabilityLayerClient, genesisHeight int64, storeInitialHeight int64, storeLastBlockHeight int64, proxyAppConns proxy.AppConns, mockStore store.Store) (*Manager, error) { +func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerClient, dalc da.DataAvailabilityLayerClient, genesisHeight int64, storeInitialHeight int64, storeLastBlockHeight int64, proxyAppConns proxy.AppConns, mockStore store.Store) (*Manager, error) { genesis := testutil.GenerateGenesis(genesisHeight) // Change the LastBlockHeight to avoid calling InitChainSync within the manager // And updating the state according to the genesis. @@ -382,7 +431,7 @@ func getManager(settlementlc settlement.LayerClient, dalc da.DataAvailabilityLay if _, err := managerStore.UpdateState(state, nil); err != nil { return nil, err } - conf := getManagerConfig() + logger := log.TestingLogger() pubsubServer := pubsub.NewServer() pubsubServer.Start() @@ -532,13 +581,19 @@ func newMockStore() *MockStore { } type SettlementLayerClientSubmitBatchError struct { + isError atomic.Value + batchCounter uint64 slmock.SettlementLayerClient } -func (s *SettlementLayerClientSubmitBatchError) SubmitBatch(_ *types.Batch, _ da.Client, _ *da.ResultSubmitBatch) *settlement.ResultSubmitBatch { - return &settlement.ResultSubmitBatch{ - BaseResult: settlement.BaseResult{Code: settlement.StatusError, Message: connectionRefusedErrorMessage}, +func (s *SettlementLayerClientSubmitBatchError) SubmitBatch(batch *types.Batch, daClient da.Client, daResultSubmitBatch *da.ResultSubmitBatch) *settlement.ResultSubmitBatch { + if s.isError.Load() == true { + return &settlement.ResultSubmitBatch{ + BaseResult: settlement.BaseResult{Code: settlement.StatusError, Message: connectionRefusedErrorMessage}, + } } + atomic.AddUint64(&s.batchCounter, 1) + return &settlement.ResultSubmitBatch{BaseResult: settlement.BaseResult{Code: settlement.StatusSuccess}} } type DALayerClientSubmitBatchError struct {