Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(manager): removes outdated syncTarget field #854

Merged
merged 29 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7b280ee
BROKEN - few changes to start removing sync target
danwt May 13, 2024
744d90c
removes UpdateSyncParams as a func
danwt May 13, 2024
c4db6ff
renames sync to target -> sync to target height
danwt May 13, 2024
6b3f56b
renames and reshuffles
danwt May 13, 2024
e17b234
removes all non test reads of sync target
danwt May 13, 2024
16cd5db
fix all non test access
danwt May 13, 2024
a587a01
rename to LastSubmittedHeight
danwt May 13, 2024
e1de9b2
removes test references
danwt May 13, 2024
88abca6
fix retrieve loop
danwt May 13, 2024
a054d81
get rid of the unnecssary prune check
danwt May 13, 2024
afde4f8
rename SyncToTargetHeightLoop
danwt May 13, 2024
0facc1e
simplify
danwt May 13, 2024
f290b86
simplify metric
danwt May 13, 2024
d308d95
test fix
danwt May 13, 2024
9606d91
improve docstring
danwt May 13, 2024
df608b5
merge main
danwt May 15, 2024
c1da14f
merge in main and manually resolve conflicts (BROKEN)
danwt May 16, 2024
ad7c279
adds in more manually resolved conflicts (BROKEN)
danwt May 16, 2024
448db03
fix one breakage
danwt May 16, 2024
b34da8d
rename and move retriever mu
danwt May 16, 2024
a37f10a
tweaks
danwt May 16, 2024
742ee1e
fmt
danwt May 16, 2024
56df81d
adds a protective check for pruning, but need to make last submitted …
danwt May 16, 2024
a22895f
use atomic on last submitted
danwt May 16, 2024
e64d78e
fix comment
danwt May 16, 2024
7bce876
add a sequencer check, but not sure it works
danwt May 16, 2024
11757d3
moved pre prune check to prune func
danwt May 16, 2024
c05a41f
move prune if cond
danwt May 16, 2024
74ce466
merge main and resolve conflict
danwt May 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
}

// Prune old heights, if requested by ABCI app.
if retainHeight > 0 {
wantToPrune := 0 < retainHeight
canPrune := !m.IsSequencer() || (uint64(retainHeight) <= m.NextHeightToSubmit()) // do not delete anything that we might submit in future
if wantToPrune && canPrune {
_, err := m.pruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
Expand Down
32 changes: 23 additions & 9 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
AccumulatedBatchSize atomic.Uint64
// The last height which was submitted to both sublayers, that we know of. When we produce new batches, we will
// start at this height + 1. Note: only accessed by one thread at a time so doesn't need synchro.
LastSubmittedHeight uint64
// It is ALSO used by the producer, because the producer needs to check if it can prune blocks and it wont'
// prune anything that might be submitted in the future. Therefore, it must be atomic.
LastSubmittedHeight atomic.Uint64

/*
Retrieval
Expand Down Expand Up @@ -130,13 +132,11 @@
func (m *Manager) Start(ctx context.Context) error {
m.logger.Info("Starting the block manager")

slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, err := m.ProposerKey.GetPublic().Raw()
isSequencer, err := m.IsSequencerVerify()
if err != nil {
return fmt.Errorf("get local node public key: %w", err)
return err
}

isSequencer := bytes.Equal(slProposerKey, localProposerKey)
m.logger.Info("Starting block manager", "isSequencer", isSequencer)

// Check if InitChain flow is needed
Expand Down Expand Up @@ -164,14 +164,28 @@
go m.SubmitLoop(ctx)
} else {
go m.RetrieveLoop(ctx)
go m.SyncToTargetHeightLoop(ctx)
}

return nil
}

func (m *Manager) IsSequencerVerify() (bool, error) {
slProposerKey := m.SLClient.GetProposer().PublicKey.Bytes()
localProposerKey, err := m.ProposerKey.GetPublic().Raw()
if err != nil {
return false, fmt.Errorf("get local node public key: %w", err)
}
return bytes.Equal(slProposerKey, localProposerKey), nil
}

func (m *Manager) IsSequencer() bool {
ret, _ := m.IsSequencerVerify()
return ret
}

func (m *Manager) NextHeightToSubmit() uint64 {
return m.LastSubmittedHeight + 1
return m.LastSubmittedHeight.Load() + 1
}

// syncBlockManager enforces the node to be synced on initial run.
Expand All @@ -180,19 +194,19 @@
if errors.Is(err, gerr.ErrNotFound) {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSubmittedHeight = uint64(m.Genesis.InitialHeight - 1)
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
return nil
}
if err != nil {
// TODO: separate between fresh rollapp and non-registered rollapp
return err
}
m.LastSubmittedHeight = res.EndHeight
m.LastSubmittedHeight.Store(res.EndHeight)
err = m.syncToTargetHeight(res.EndHeight)
if err != nil {
return err
}

m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight)
m.logger.Info("Synced.", "current height", m.State.Height(), "last submitted height", m.LastSubmittedHeight.Load())
return nil
}
4 changes: 2 additions & 2 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
}

// Initially sync target is 0
assert.Zero(t, manager.LastSubmittedHeight)
assert.Zero(t, manager.LastSubmittedHeight.Load())
assert.True(t, manager.State.Height() == 0)

// enough time to sync and produce blocks
Expand All @@ -148,7 +148,7 @@ func TestProduceOnlyAfterSynced(t *testing.T) {
assert.NoError(t, err)
}()
<-ctx.Done()
assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight)
assert.Equal(t, batch.EndHeight, manager.LastSubmittedHeight.Load())
// validate that we produced blocks
assert.Greater(t, manager.State.Height(), batch.EndHeight)
}
Expand Down
2 changes: 1 addition & 1 deletion block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (m *Manager) HandleSubmissionTrigger() error {
}

types.RollappHubHeightGauge.Set(float64(actualEndHeight))
m.LastSubmittedHeight = actualEndHeight
m.LastSubmittedHeight.Store(actualEndHeight)
return nil
}

Expand Down
22 changes: 11 additions & 11 deletions block/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,17 @@ func TestBatchSubmissionHappyFlow(t *testing.T) {
// Check initial assertions
initialHeight := uint64(0)
require.Zero(manager.State.Height())
require.Zero(manager.LastSubmittedHeight)
require.Zero(manager.LastSubmittedHeight.Load())

// Produce block and validate that we produced blocks
_, _, err = manager.ProduceAndGossipBlock(ctx, true)
require.NoError(err)
assert.Greater(t, manager.State.Height(), initialHeight)
assert.Zero(t, manager.LastSubmittedHeight)
assert.Zero(t, manager.LastSubmittedHeight.Load())

// submit and validate sync target
manager.HandleSubmissionTrigger()
assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight)
assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load())
}

func TestBatchSubmissionFailedSubmission(t *testing.T) {
Expand Down Expand Up @@ -86,13 +86,13 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) {
// Check initial assertions
initialHeight := uint64(0)
require.Zero(manager.State.Height())
require.Zero(manager.LastSubmittedHeight)
require.Zero(manager.LastSubmittedHeight.Load())

// Produce block and validate that we produced blocks
_, _, err = manager.ProduceAndGossipBlock(ctx, true)
require.NoError(err)
assert.Greater(t, manager.State.Height(), initialHeight)
assert.Zero(t, manager.LastSubmittedHeight)
assert.Zero(t, manager.LastSubmittedHeight.Load())

// try to submit, we expect failure
mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("submit batch")).Once()
Expand All @@ -101,7 +101,7 @@ func TestBatchSubmissionFailedSubmission(t *testing.T) {
// try to submit again, we expect success
mockLayerI.On("SubmitBatch", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
manager.HandleSubmissionTrigger()
assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight)
assert.EqualValues(t, manager.State.Height(), manager.LastSubmittedHeight.Load())
}

// TestSubmissionByTime tests the submission trigger by time
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestSubmissionByTime(t *testing.T) {
// Check initial height
initialHeight := uint64(0)
require.Equal(initialHeight, manager.State.Height())
require.Zero(manager.LastSubmittedHeight)
require.Zero(manager.LastSubmittedHeight.Load())

var wg sync.WaitGroup
mCtx, cancel := context.WithTimeout(context.Background(), 2*submitTimeout)
Expand All @@ -154,7 +154,7 @@ func TestSubmissionByTime(t *testing.T) {
}()

wg.Wait() // Wait for all goroutines to finish
require.True(0 < manager.LastSubmittedHeight)
require.True(0 < manager.LastSubmittedHeight.Load())
}

// TestSubmissionByBatchSize tests the submission trigger by batch size
Expand Down Expand Up @@ -201,7 +201,7 @@ func TestSubmissionByBatchSize(t *testing.T) {
}()

go func() {
assert.Zero(manager.LastSubmittedHeight)
assert.Zero(manager.LastSubmittedHeight.Load())
manager.SubmitLoop(ctx)
wg.Done() // Decrease counter when this goroutine finishes
}()
Expand All @@ -215,9 +215,9 @@ func TestSubmissionByBatchSize(t *testing.T) {
wg.Wait() // Wait for all goroutines to finish

if c.expectedSubmission {
assert.Positive(manager.LastSubmittedHeight)
assert.Positive(manager.LastSubmittedHeight.Load())
} else {
assert.Zero(manager.LastSubmittedHeight)
assert.Zero(manager.LastSubmittedHeight.Load())
}
}
}
Loading