-
Notifications
You must be signed in to change notification settings - Fork 281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(L1 follower): adjust to recent CodecV7
and contract changes
#1120
base: develop
Are you sure you want to change the base?
Changes from 71 commits
ea5a603
b30006f
de37d47
e34fecf
1327771
b05954d
ce8f785
f10c383
cb0a90e
b918a2b
e51182d
4e6f759
fd6bff3
ab3e873
4ced6f2
6aafa74
da81a2e
8750045
2499c69
fb4fe7c
9bf2f25
d222f58
c56be0d
3950e58
a043d2f
94c0ad5
67c1866
43d54cb
4290d16
574dd53
53b6ebf
ab3bedf
3335654
3e18f7f
2342335
7c14639
634d1f1
0fa3743
e048f53
a2a68e8
ca6649e
80976ad
4022989
d4cc897
a2cb3d1
6d5af23
3c21f4e
0bd6eb1
9139e94
e9154ca
8820949
8ba140c
e6af5b6
b794dda
438ec09
91aae90
b715d4f
4f74920
ec6df85
4d100c1
ee53754
9fc77d8
cdc4b67
8d2167b
5c6a8d5
5f08a5c
87278ff
4c3c16f
973bd91
ffd2c13
22215e8
5b6a6ae
5935e62
46a5ef3
86020e4
bd6f89e
6c2b57c
8078781
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,7 +51,7 @@ require ( | |
github.com/prometheus/tsdb v0.7.1 | ||
github.com/rjeczalik/notify v0.9.1 | ||
github.com/rs/cors v1.7.0 | ||
github.com/scroll-tech/da-codec v0.1.3-0.20250210041951-d028c537b995 | ||
github.com/scroll-tech/da-codec v0.1.3-0.20250219115141-f9608ed9904d | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we change it to support set code transactions? i.e. latest commit in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
github.com/scroll-tech/zktrie v0.8.4 | ||
github.com/shirou/gopsutil v3.21.11+incompatible | ||
github.com/sourcegraph/conc v0.3.0 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,7 +7,9 @@ import ( | |
"github.com/scroll-tech/go-ethereum/common" | ||
"github.com/scroll-tech/go-ethereum/core/rawdb" | ||
"github.com/scroll-tech/go-ethereum/ethdb" | ||
"github.com/scroll-tech/go-ethereum/log" | ||
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" | ||
"github.com/scroll-tech/go-ethereum/rollup/l1" | ||
) | ||
|
||
// BatchQueue is a pipeline stage that reads all batch events from DAQueue and provides only finalized batches to the next stage. | ||
|
@@ -17,20 +19,23 @@ type BatchQueue struct { | |
lastFinalizedBatchIndex uint64 | ||
batches *common.Heap[da.Entry] | ||
batchesMap *common.ShrinkingMap[uint64, *common.HeapElement[da.Entry]] | ||
|
||
previousBatch *rawdb.DAProcessedBatchMeta | ||
} | ||
|
||
func NewBatchQueue(DAQueue *DAQueue, db ethdb.Database) *BatchQueue { | ||
func NewBatchQueue(DAQueue *DAQueue, db ethdb.Database, lastProcessedBatch *rawdb.DAProcessedBatchMeta) *BatchQueue { | ||
jonastheis marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return &BatchQueue{ | ||
DAQueue: DAQueue, | ||
db: db, | ||
lastFinalizedBatchIndex: 0, | ||
lastFinalizedBatchIndex: lastProcessedBatch.BatchIndex, | ||
batches: common.NewHeap[da.Entry](), | ||
batchesMap: common.NewShrinkingMap[uint64, *common.HeapElement[da.Entry]](1000), | ||
previousBatch: lastProcessedBatch, | ||
} | ||
} | ||
|
||
// NextBatch finds next finalized batch and returns data, that was committed in that batch | ||
func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) { | ||
func (bq *BatchQueue) NextBatch(ctx context.Context) (da.EntryWithBlocks, error) { | ||
if batch := bq.getFinalizedBatch(); batch != nil { | ||
return batch, nil | ||
} | ||
|
@@ -50,7 +55,9 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) { | |
case da.CommitBatchV0Type, da.CommitBatchWithBlobType: | ||
bq.addBatch(daEntry) | ||
case da.RevertBatchType: | ||
bq.deleteBatch(daEntry) | ||
if err = bq.handleRevertEvent(daEntry.Event()); err != nil { | ||
return nil, fmt.Errorf("failed to handle revert event: %w", err) | ||
} | ||
case da.FinalizeBatchType: | ||
if daEntry.BatchIndex() > bq.lastFinalizedBatchIndex { | ||
bq.lastFinalizedBatchIndex = daEntry.BatchIndex() | ||
|
@@ -66,15 +73,16 @@ func (bq *BatchQueue) NextBatch(ctx context.Context) (da.Entry, error) { | |
} | ||
|
||
// getFinalizedBatch returns next finalized batch if there is available | ||
func (bq *BatchQueue) getFinalizedBatch() da.Entry { | ||
func (bq *BatchQueue) getFinalizedBatch() da.EntryWithBlocks { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit counter-intuitive that a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed |
||
if bq.batches.Len() == 0 { | ||
return nil | ||
} | ||
|
||
batch := bq.batches.Peek().Value() | ||
// we process all batches smaller or equal to the last finalized batch index -> this reflects bundles of multiple batches | ||
// where we only receive the finalize event for the last batch of the bundle. | ||
if batch.BatchIndex() <= bq.lastFinalizedBatchIndex { | ||
bq.deleteBatch(batch) | ||
return batch | ||
return bq.processAndDeleteBatch(batch) | ||
} else { | ||
return nil | ||
} | ||
|
@@ -85,25 +93,85 @@ func (bq *BatchQueue) addBatch(batch da.Entry) { | |
bq.batchesMap.Set(batch.BatchIndex(), heapElement) | ||
} | ||
|
||
// deleteBatch deletes data committed in the batch from map, because this batch is reverted or finalized | ||
// updates DASyncedL1BlockNumber | ||
func (bq *BatchQueue) deleteBatch(batch da.Entry) { | ||
batchHeapElement, exists := bq.batchesMap.Get(batch.BatchIndex()) | ||
func (bq *BatchQueue) handleRevertEvent(event l1.RollupEvent) error { | ||
switch event.Type() { | ||
case l1.RevertEventV0Type: | ||
revertBatch, ok := event.(*l1.RevertBatchEventV0) | ||
if !ok { | ||
return fmt.Errorf("unexpected type of revert event: %T, expected RevertEventV0Type", event) | ||
} | ||
|
||
bq.deleteBatch(revertBatch.BatchIndex().Uint64()) | ||
case l1.RevertEventV7Type: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not related to this PR, but would this event affect the handling logic of bridge-history |
||
revertBatch, ok := event.(*l1.RevertBatchEventV7) | ||
if !ok { | ||
return fmt.Errorf("unexpected type of revert event: %T, expected RevertEventV7Type", event) | ||
} | ||
|
||
// delete all batches from revertBatch.StartBatchIndex (inclusive) to revertBatch.FinishBatchIndex (inclusive) | ||
for i := revertBatch.StartBatchIndex().Uint64(); i <= revertBatch.FinishBatchIndex().Uint64(); i++ { | ||
bq.deleteBatch(i) | ||
} | ||
default: | ||
return fmt.Errorf("unexpected type of revert event: %T", event) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (bq *BatchQueue) deleteBatch(batchIndex uint64) (deleted bool) { | ||
batchHeapElement, exists := bq.batchesMap.Get(batchIndex) | ||
if !exists { | ||
return | ||
return false | ||
} | ||
|
||
bq.batchesMap.Delete(batch.BatchIndex()) | ||
bq.batchesMap.Delete(batchIndex) | ||
bq.batches.Remove(batchHeapElement) | ||
|
||
// we store here min height of currently loaded batches to be able to start syncing from the same place in case of restart | ||
// TODO: we should store this information when the batch is done being processed to avoid inconsistencies | ||
rawdb.WriteDASyncedL1BlockNumber(bq.db, batch.L1BlockNumber()-1) | ||
return true | ||
} | ||
|
||
// processAndDeleteBatch deletes data committed in the batch from map, because this batch is reverted or finalized | ||
// updates DASyncedL1BlockNumber | ||
func (bq *BatchQueue) processAndDeleteBatch(batch da.Entry) da.EntryWithBlocks { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems like it's only called for finalized batches, not for reverts, is that intentional? Seems like for reverts we don't call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah the comment was outdated, it was used before but not anymore. |
||
if !bq.deleteBatch(batch.BatchIndex()) { | ||
return nil | ||
} | ||
|
||
entryWithBlocks, ok := batch.(da.EntryWithBlocks) | ||
if !ok { | ||
// this should only happen if we delete a reverted batch | ||
return nil | ||
} | ||
|
||
// sanity check that the next batch is the one we expect | ||
if bq.previousBatch.BatchIndex > 0 && bq.previousBatch.BatchIndex+1 != entryWithBlocks.BatchIndex() { | ||
log.Info("BatchQueue: skipping batch ", "currentBatch", entryWithBlocks.BatchIndex(), "previousBatch", bq.previousBatch.BatchIndex) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this happen? If not, why is this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this might happen if we specify a wrong batch in recovery mode for example |
||
return nil | ||
} | ||
|
||
// carry forward the total L1 messages popped from the previous batch | ||
entryWithBlocks.SetParentTotalL1MessagePopped(bq.previousBatch.TotalL1MessagesPopped) | ||
|
||
// we store the previous batch as it has been completely processed which we know because the next batch is requested within the pipeline. | ||
// In case of a restart or crash we can continue from the last processed batch (and its metadata). | ||
rawdb.WriteDAProcessedBatchMeta(bq.db, bq.previousBatch) | ||
|
||
log.Info("processing batch", "batchIndex", entryWithBlocks.BatchIndex(), "L1BlockNumber", entryWithBlocks.L1BlockNumber(), "totalL1MessagesPopped", entryWithBlocks.TotalL1MessagesPopped(), "previousBatch", bq.previousBatch.BatchIndex, "previousL1BlockNumber", bq.previousBatch.L1BlockNumber, "previous TotalL1MessagesPopped", bq.previousBatch.TotalL1MessagesPopped) | ||
|
||
bq.previousBatch = &rawdb.DAProcessedBatchMeta{ | ||
L1BlockNumber: entryWithBlocks.L1BlockNumber(), | ||
BatchIndex: entryWithBlocks.BatchIndex(), | ||
TotalL1MessagesPopped: entryWithBlocks.TotalL1MessagesPopped(), | ||
} | ||
|
||
return entryWithBlocks | ||
} | ||
|
||
func (bq *BatchQueue) Reset(height uint64) { | ||
func (bq *BatchQueue) Reset(lastProcessedBatchMeta *rawdb.DAProcessedBatchMeta) { | ||
bq.batches.Clear() | ||
bq.batchesMap.Clear() | ||
bq.lastFinalizedBatchIndex = 0 | ||
bq.DAQueue.Reset(height) | ||
bq.lastFinalizedBatchIndex = lastProcessedBatchMeta.BatchIndex | ||
bq.previousBatch = lastProcessedBatchMeta | ||
bq.DAQueue.Reset(lastProcessedBatchMeta) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the behavior if a node is not updated? Fail at the EuclidV2 transition, but then recover after the node is upgraded?