Skip to content

Commit

Permalink
refactor: separate logic of aggregator with non aggregator (#558)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtsitrin authored Feb 18, 2024
1 parent 996e681 commit 865d7a3
Show file tree
Hide file tree
Showing 17 changed files with 344 additions and 387 deletions.
66 changes: 56 additions & 10 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"code.cloudfoundry.org/go-diodes"

Expand Down Expand Up @@ -70,10 +71,11 @@ type Manager struct {
shouldProduceBlocksCh chan bool
produceEmptyBlockCh chan bool

syncTarget uint64
lastSubmissionTime int64
batchInProcess atomic.Value
isSyncedCond sync.Cond
syncTarget uint64
lastSubmissionTime int64
batchInProcess atomic.Value
isSyncedCond sync.Cond

produceBlockMutex sync.Mutex
applyCachedBlockMutex sync.Mutex

Expand Down Expand Up @@ -182,20 +184,62 @@ func NewManager(
// Start starts the block manager.
func (m *Manager) Start(ctx context.Context, isAggregator bool) error {
m.logger.Info("Starting the block manager")

err := m.syncBlockManager(ctx)
if err != nil {
err = fmt.Errorf("failed to sync block manager: %w", err)
return err
}

if isAggregator {
m.logger.Info("Starting in aggregator mode")
// TODO(omritoptix): change to private methods
go m.ProduceBlockLoop(ctx)
go m.SubmitLoop(ctx)
} else {
// TODO(omritoptix): change to private methods
go m.RetriveLoop(ctx)
go m.SyncTargetLoop(ctx)
}
// TODO(omritoptix): change to private methods
go m.RetriveLoop(ctx)
go m.SyncTargetLoop(ctx)
m.EventListener(ctx)

m.EventListener(ctx, isAggregator)

return nil
}

// syncBlockManager enforces the node to be synced on initial run.
func (m *Manager) syncBlockManager(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
// Set the syncTarget according to the result
if err != nil {
//TODO: separate between fresh rollapp and non-registred rollapp
if err == settlement.ErrBatchNotFound {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1))
return nil
}
return err
}
atomic.StoreUint64(&m.syncTarget, resultRetrieveBatch.EndHeight)
err = m.syncUntilTarget(ctx, resultRetrieveBatch.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
return nil
}

// updateSyncParams updates the sync target and state index if necessary
func (m *Manager) updateSyncParams(endHeight uint64) {
rollappHubHeightGauge.Set(float64(endHeight))
m.logger.Info("Received new syncTarget", "syncTarget", endHeight)
atomic.StoreUint64(&m.syncTarget, endHeight)
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().UnixNano())
}

func getAddress(key crypto.PrivKey) ([]byte, error) {
rawKey, err := key.GetPublic().Raw()
if err != nil {
Expand All @@ -205,9 +249,11 @@ func getAddress(key crypto.PrivKey) ([]byte, error) {
}

// EventListener registers events to callbacks.
func (m *Manager) EventListener(ctx context.Context) {
func (m *Manager) EventListener(ctx context.Context, isAggregator bool) {
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "nodeHealthStatusHandler", events.EventQueryHealthStatus, m.healthStatusEventCallback, m.logger)
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.applyBlockCallback, m.logger, 100)
if !isAggregator {
go utils.SubscribeAndHandleEvents(ctx, m.pubsub, "ApplyBlockLoop", p2p.EventQueryNewNewGossipedBlock, m.applyBlockCallback, m.logger, 100)
}

}

Expand Down
33 changes: 10 additions & 23 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
require.NotNil(t, manager)

t.Log("Taking the manager out of sync by submitting a batch")
lastStoreHeight := manager.store.Height()
syncTarget := atomic.LoadUint64(&manager.syncTarget)
numBatchesToAdd := 2
nextBatchStartHeight := atomic.LoadUint64(&manager.syncTarget) + 1
nextBatchStartHeight := syncTarget + 1
var batch *types.Batch
for i := 0; i < numBatchesToAdd; i++ {
batch, err = testutil.GenerateBatch(nextBatchStartHeight, nextBatchStartHeight+uint64(defaultBatchSize-1), manager.proposerKey)
Expand All @@ -136,30 +136,17 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
time.Sleep(time.Millisecond * 500)
}

t.Log("Validating manager can't produce blocks")
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
go manager.ProduceBlockLoop(ctx)
<-ctx.Done()
assert.Equal(t, lastStoreHeight, manager.store.Height())
// Wait until produce block loop is done
time.Sleep(time.Second * 1)

t.Log("Sync the manager")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
go manager.Start(ctx, false)
<-ctx.Done()
require.Greater(t, manager.store.Height(), lastStoreHeight)
assert.Equal(t, batch.EndHeight, manager.store.Height())
// Wait until manager is done
time.Sleep(time.Second * 4)
//Initially sync target is 0
assert.True(t, manager.syncTarget == 0)
assert.True(t, manager.store.Height() == 0)

t.Log("Validate blocks are produced")
ctx, cancel = context.WithTimeout(context.Background(), time.Second*3)
//enough time to sync and produce blocks
ctx, cancel := context.WithTimeout(context.Background(), time.Second*4)
defer cancel()
go manager.ProduceBlockLoop(ctx)
go manager.Start(ctx, true)
<-ctx.Done()
assert.True(t, manager.syncTarget == batch.EndHeight)
//validate that we produced blocks
assert.Greater(t, manager.store.Height(), batch.EndHeight)
}

Expand Down
43 changes: 1 addition & 42 deletions block/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,19 @@ package block
import (
"context"
"fmt"
"sync/atomic"
"time"

"cosmossdk.io/errors"
abciconv "github.com/dymensionxyz/dymint/conv/abci"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
tmed25519 "github.com/tendermint/tendermint/crypto/ed25519"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
tmtypes "github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)

// waitForSync enforces the aggregator to be synced before it can produce blocks.
// It requires the retriveBlockLoop to be running.
func (m *Manager) waitForSync(ctx context.Context) error {
resultRetrieveBatch, err := m.getLatestBatchFromSL(ctx)
// Set the syncTarget according to the result
if err == settlement.ErrBatchNotFound {
// Since we requested the latest batch and got batch not found it means
// the SL still hasn't got any batches for this chain.
m.logger.Info("No batches for chain found in SL. Start writing first batch")
atomic.StoreUint64(&m.syncTarget, uint64(m.genesis.InitialHeight-1))
return nil
} else if err != nil {
m.logger.Error("failed to retrieve batch from SL", "err", err)
return err
} else {
m.updateSyncParams(ctx, resultRetrieveBatch.EndHeight)
}
// Wait until isSynced is true and then call the PublishBlockLoop
m.isSyncedCond.L.Lock()
// Wait until we're synced and that we have got the latest batch (if we didn't, m.syncTarget == 0)
// before we start publishing blocks
for m.store.Height() < atomic.LoadUint64(&m.syncTarget) {
m.logger.Info("Waiting for sync", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
m.isSyncedCond.Wait()
}
m.isSyncedCond.L.Unlock()
m.logger.Info("Synced, Starting to produce", "current height", m.store.Height(), "syncTarget", atomic.LoadUint64(&m.syncTarget))
return nil
}

// ProduceBlockLoop is calling publishBlock in a loop as long as wer'e synced.
func (m *Manager) ProduceBlockLoop(ctx context.Context) {
atomic.StoreInt64(&m.lastSubmissionTime, time.Now().Unix())

// We want to wait until we are synced. After that, since there is no leader
// election yet, and leader are elected manually, we will not be out of sync until
// we are manually being replaced.
err := m.waitForSync(ctx)
if err != nil {
panic(errors.Wrap(err, "failed to wait for sync"))
}
m.logger.Debug("Started produce loop")

ticker := time.NewTicker(m.conf.BlockTime)
defer ticker.Stop()
Expand Down
60 changes: 57 additions & 3 deletions block/production_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package block
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/dymensionxyz/dymint/config"
"github.com/dymensionxyz/dymint/mempool"
mempoolv1 "github.com/dymensionxyz/dymint/mempool/v1"
"github.com/dymensionxyz/dymint/types"
tmcfg "github.com/tendermint/tendermint/config"

"github.com/dymensionxyz/dymint/testutil"
Expand Down Expand Up @@ -193,11 +195,63 @@ func TestBatchSubmissionAfterTimeout(t *testing.T) {
require.Equal(initialHeight, manager.store.Height())
require.True(manager.batchInProcess.Load() == false)

require.True(manager.syncTarget == 0)

var wg sync.WaitGroup
mCtx, cancel := context.WithTimeout(context.Background(), runTime)
defer cancel()
go manager.ProduceBlockLoop(mCtx)
go manager.SubmitLoop(mCtx)

wg.Add(2) // Add 2 because we have 2 goroutines

go func() {
defer wg.Done() // Decrease counter when this goroutine finishes
manager.ProduceBlockLoop(mCtx)
}()

go func() {
defer wg.Done() // Decrease counter when this goroutine finishes
manager.SubmitLoop(mCtx)
}()

<-mCtx.Done()
wg.Wait() // Wait for all goroutines to finish
require.True(manager.syncTarget > 0)
}

func TestInvalidBatch(t *testing.T) {
assert := assert.New(t)
require := require.New(t)

manager, err := getManager(getManagerConfig(), nil, nil, 1, 1, 0, nil, nil)
require.NoError(err)

require.True(manager.batchInProcess.Load() == true)
batchSize := uint64(5)
syncTarget := uint64(10)

// Create cases
cases := []struct {
startHeight uint64
endHeight uint64
shouldError bool
}{
{startHeight: syncTarget + 1, endHeight: syncTarget + batchSize, shouldError: false},
// batch with endHight < startHeight
{startHeight: syncTarget + 1, endHeight: syncTarget, shouldError: true},
// batch with startHeight != previousEndHeight + 1
{startHeight: syncTarget, endHeight: syncTarget + batchSize + batchSize, shouldError: true},
}
for _, c := range cases {
batch := &types.Batch{
StartHeight: c.startHeight,
EndHeight: c.endHeight,
}

manager.updateSyncParams(syncTarget)
err := manager.validateBatch(batch)
if c.shouldError {
assert.Error(err)
} else {
assert.NoError(err)
}
}
}
3 changes: 2 additions & 1 deletion block/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
// runs syncUntilTarget on the latest message in the ring buffer.
func (m *Manager) RetriveLoop(ctx context.Context) {
m.logger.Info("Started retrieve loop")
syncTargetpoller := diodes.NewPoller(m.syncTargetDiode)
syncTargetpoller := diodes.NewPoller(m.syncTargetDiode, diodes.WithPollingContext(ctx))

for {
select {
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 865d7a3

Please sign in to comment.