From 0e043d55b4e71257d40675903c38b0940723f8ac Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Sat, 6 Jan 2024 17:47:09 -0600 Subject: [PATCH] VerifiedROBlobs in initial-sync (#13351) * Use VerifiedROBlobs in initial-sync * Update beacon-chain/das/cache.go Co-authored-by: Justin Traglia <95511699+jtraglia@users.noreply.github.com> * Apply suggestions from code review comment fixes Co-authored-by: Justin Traglia <95511699+jtraglia@users.noreply.github.com> * fix lint error from gh web ui * deepsource fixes * more deepsource * fix init wiring * mark blobless blocks verified in batch mode * move sig check after parent checks * validate block commitment length at start of da check * remove vestigial locking * rm more copy-locksta * rm old comment * fail the entire batch if any sidecar fails * lint * skip redundant checks, fix len check * assume sig and proposer checks passed for block * inherits most checks from processed block * Assume block processing handles most checks * lint * cleanup unused call and gaz * more detailed logging for e2e * fix bad refactor breaking non-finalized init-sync * self-review cleanup * gaz * Update beacon-chain/verification/blob.go Co-authored-by: Justin Traglia <95511699+jtraglia@users.noreply.github.com> * terence and justin feedback --------- Co-authored-by: Kasey Kirkham Co-authored-by: Justin Traglia <95511699+jtraglia@users.noreply.github.com> --- beacon-chain/blockchain/BUILD.bazel | 2 + beacon-chain/blockchain/kzg/validation.go | 23 +- beacon-chain/blockchain/process_block.go | 38 +--- beacon-chain/blockchain/process_block_test.go | 77 +------ beacon-chain/blockchain/receive_block.go | 26 ++- beacon-chain/blockchain/receive_block_test.go | 7 +- beacon-chain/blockchain/testing/BUILD.bazel | 1 + beacon-chain/blockchain/testing/mock.go | 21 +- beacon-chain/das/BUILD.bazel | 49 ++++ beacon-chain/das/availability.go | 149 ++++++++++++ beacon-chain/das/availability_test.go | 214 ++++++++++++++++++ beacon-chain/das/cache.go | 117 ++++++++++ beacon-chain/das/cache_test.go | 25 ++ beacon-chain/das/iface.go | 19 ++ beacon-chain/das/mock.go | 32 +++ beacon-chain/node/node.go | 10 +- .../rpc/prysm/v1alpha1/validator/proposer.go | 2 +- beacon-chain/sync/BUILD.bazel | 1 - beacon-chain/sync/initial-sync/BUILD.bazel | 6 + .../sync/initial-sync/blocks_fetcher.go | 22 +- .../sync/initial-sync/blocks_fetcher_test.go | 12 +- .../sync/initial-sync/blocks_fetcher_utils.go | 2 +- .../sync/initial-sync/blocks_queue.go | 2 +- .../sync/initial-sync/blocks_queue_test.go | 14 +- beacon-chain/sync/initial-sync/fsm.go | 4 +- beacon-chain/sync/initial-sync/round_robin.go | 102 ++++----- .../sync/initial-sync/round_robin_test.go | 43 ++-- beacon-chain/sync/initial-sync/service.go | 43 +++- .../sync/initial-sync/service_test.go | 3 + .../sync/initial-sync/verification.go | 99 ++++++++ beacon-chain/sync/mock_blob_verifier.go | 84 ------- beacon-chain/sync/pending_blocks_queue.go | 2 +- beacon-chain/sync/service.go | 10 +- beacon-chain/sync/subscriber_beacon_blocks.go | 2 +- beacon-chain/sync/validate_blob.go | 4 +- beacon-chain/sync/validate_blob_test.go | 52 ++--- beacon-chain/verification/BUILD.bazel | 2 + beacon-chain/verification/blob.go | 89 +++++--- beacon-chain/verification/blob_test.go | 84 +++---- beacon-chain/verification/fake.go | 27 ++- beacon-chain/verification/initializer.go | 6 +- beacon-chain/verification/interface.go | 31 +++ beacon-chain/verification/mock.go | 74 ++++++ consensus-types/blocks/roblock.go | 16 +- runtime/logging/blob.go | 11 + .../shared/common/forkchoice/builder.go | 4 +- 46 files changed, 1209 insertions(+), 454 deletions(-) create mode 100644 beacon-chain/das/BUILD.bazel create mode 100644 beacon-chain/das/availability.go create mode 100644 beacon-chain/das/availability_test.go create mode 100644 beacon-chain/das/cache.go create mode 100644 beacon-chain/das/cache_test.go create mode 100644 beacon-chain/das/iface.go create mode 100644 beacon-chain/das/mock.go create mode 100644 beacon-chain/sync/initial-sync/verification.go delete mode 100644 beacon-chain/sync/mock_blob_verifier.go create mode 100644 beacon-chain/verification/interface.go create mode 100644 beacon-chain/verification/mock.go diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 561d4fd699e6..b69436fdc500 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -50,6 +50,7 @@ go_library( "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/filters:go_default_library", @@ -141,6 +142,7 @@ go_test( "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/transition:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/testing:go_default_library", diff --git a/beacon-chain/blockchain/kzg/validation.go b/beacon-chain/blockchain/kzg/validation.go index f4a46a02ce89..dfd09ce3ba53 100644 --- a/beacon-chain/blockchain/kzg/validation.go +++ b/beacon-chain/blockchain/kzg/validation.go @@ -32,9 +32,26 @@ func IsDataAvailable(commitments [][]byte, sidecars []*ethpb.DeprecatedBlobSidec return kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs) } -// VerifyROBlobCommitment is a helper that massages the fields of an ROBlob into the types needed to call VerifyBlobKZGProof. -func VerifyROBlobCommitment(sc blocks.ROBlob) error { - return kzgContext.VerifyBlobKZGProof(bytesToBlob(sc.Blob), bytesToCommitment(sc.KzgCommitment), bytesToKZGProof(sc.KzgProof)) +// Verify performs single or batch verification of commitments depending on the number of given BlobSidecars. +func Verify(sidecars ...blocks.ROBlob) error { + if len(sidecars) == 0 { + return nil + } + if len(sidecars) == 1 { + return kzgContext.VerifyBlobKZGProof( + bytesToBlob(sidecars[0].Blob), + bytesToCommitment(sidecars[0].KzgCommitment), + bytesToKZGProof(sidecars[0].KzgProof)) + } + blobs := make([]GoKZG.Blob, len(sidecars)) + cmts := make([]GoKZG.KZGCommitment, len(sidecars)) + proofs := make([]GoKZG.KZGProof, len(sidecars)) + for i, sidecar := range sidecars { + blobs[i] = bytesToBlob(sidecar.Blob) + cmts[i] = bytesToCommitment(sidecar.KzgCommitment) + proofs[i] = bytesToKZGProof(sidecar.KzgProof) + } + return kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs) } func bytesToBlob(blob []byte) (ret GoKZG.Blob) { diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 769ebc2b6e58..6f677b2ce8be 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" @@ -122,7 +123,7 @@ func getStateVersionAndPayload(st state.BeaconState) (int, interfaces.ExecutionD return preStateVersion, preStateHeader, nil } -func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock) error { +func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock, avs das.AvailabilityStore) error { ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch") defer span.End() @@ -225,8 +226,8 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo return err } } - if err := s.databaseDACheck(ctx, b); err != nil { - return errors.Wrap(err, "could not validate blob data availability") + if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil { + return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot()) } args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(), JustifiedCheckpoint: jCheckpoints[i], @@ -293,37 +294,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo return s.saveHeadNoDB(ctx, lastB, lastBR, preState, !isValidPayload) } -func commitmentsToCheck(b consensusblocks.ROBlock, current primitives.Slot) [][]byte { - if b.Version() < version.Deneb { - return nil - } - // We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) { - return nil - } - kzgCommitments, err := b.Block().Body().BlobKzgCommitments() - if err != nil { - return nil - } - return kzgCommitments -} - -func (s *Service) databaseDACheck(ctx context.Context, b consensusblocks.ROBlock) error { - commitments := commitmentsToCheck(b, s.CurrentSlot()) - if len(commitments) == 0 { - return nil - } - missing, err := missingIndices(s.blobStorage, b.Root(), commitments) - if err != nil { - return err - } - if len(missing) == 0 { - return nil - } - // TODO: don't worry that this error isn't informative, it will be superceded by a detailed sidecar cache error. - return errors.New("not all kzg commitments are available") -} - func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error { e := coreTime.CurrentEpoch(st) if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil { diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index 1d67a364d96e..ad3860ed91f0 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -1,7 +1,6 @@ package blockchain import ( - "bytes" "context" "fmt" "math/big" @@ -17,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" @@ -40,7 +40,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/testing/require" "github.com/prysmaticlabs/prysm/v4/testing/util" prysmTime "github.com/prysmaticlabs/prysm/v4/time" - "github.com/prysmaticlabs/prysm/v4/time/slots" logTest "github.com/sirupsen/logrus/hooks/test" ) @@ -69,7 +68,7 @@ func TestStore_OnBlockBatch(t *testing.T) { require.NoError(t, err) blks = append(blks, rwsb) } - err := service.onBlockBatch(ctx, blks) + err := service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{}) require.NoError(t, err) jcp := service.CurrentJustifiedCheckpt() jroot := bytesutil.ToBytes32(jcp.Root) @@ -99,7 +98,7 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) { require.NoError(t, service.saveInitSyncBlock(ctx, rwsb.Root(), wsb)) blks = append(blks, rwsb) } - require.NoError(t, service.onBlockBatch(ctx, blks)) + require.NoError(t, service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{})) } func TestCachedPreState_CanGetFromStateSummary(t *testing.T) { @@ -1946,7 +1945,7 @@ func TestNoViableHead_Reboot(t *testing.T) { rwsb, err := consensusblocks.NewROBlock(wsb) require.NoError(t, err) // We use onBlockBatch here because the valid chain is missing in forkchoice - require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb})) + require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}, &das.MockAvailabilityStore{})) // Check that the head is now VALID and the node is not optimistic require.Equal(t, genesisRoot, service.ensureRootNotZeros(service.cfg.ForkChoiceStore.CachedHeadRoot())) headRoot, err = service.HeadRoot(ctx) @@ -2049,74 +2048,6 @@ func driftGenesisTime(s *Service, slot, delay int64) { s.SetGenesisTime(time.Unix(time.Now().Unix()-offset, 0)) } -func Test_commitmentsToCheck(t *testing.T) { - windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) - require.NoError(t, err) - commits := [][]byte{ - bytesutil.PadTo([]byte("a"), 48), - bytesutil.PadTo([]byte("b"), 48), - bytesutil.PadTo([]byte("c"), 48), - bytesutil.PadTo([]byte("d"), 48), - } - cases := []struct { - name string - commits [][]byte - block func(*testing.T) consensusblocks.ROBlock - slot primitives.Slot - }{ - { - name: "pre deneb", - block: func(t *testing.T) consensusblocks.ROBlock { - bb := util.NewBeaconBlockBellatrix() - sb, err := consensusblocks.NewSignedBeaconBlock(bb) - require.NoError(t, err) - rb, err := consensusblocks.NewROBlock(sb) - require.NoError(t, err) - return rb - }, - }, - { - name: "commitments within da", - block: func(t *testing.T) consensusblocks.ROBlock { - d := util.NewBeaconBlockDeneb() - d.Block.Body.BlobKzgCommitments = commits - d.Block.Slot = 100 - sb, err := consensusblocks.NewSignedBeaconBlock(d) - require.NoError(t, err) - rb, err := consensusblocks.NewROBlock(sb) - require.NoError(t, err) - return rb - }, - commits: commits, - slot: 100, - }, - { - name: "commitments outside da", - block: func(t *testing.T) consensusblocks.ROBlock { - d := util.NewBeaconBlockDeneb() - // block is from slot 0, "current slot" is window size +1 (so outside the window) - d.Block.Body.BlobKzgCommitments = commits - sb, err := consensusblocks.NewSignedBeaconBlock(d) - require.NoError(t, err) - rb, err := consensusblocks.NewROBlock(sb) - require.NoError(t, err) - return rb - }, - slot: windowSlots + 1, - }, - } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - b := c.block(t) - co := commitmentsToCheck(b, c.slot) - require.Equal(t, len(c.commits), len(co)) - for i := 0; i < len(c.commits); i++ { - require.Equal(t, true, bytes.Equal(c.commits[i], co[i])) - } - }) - } -} - func TestMissingIndices(t *testing.T) { cases := []struct { name string diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 0ebb483c6845..581b0bdbd3d5 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/config/features" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -33,8 +34,8 @@ var epochsSinceFinalitySaveHotStateDB = primitives.Epoch(100) // BlockReceiver interface defines the methods of chain service for receiving and processing new blocks. type BlockReceiver interface { - ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error - ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error + ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error + ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error HasBlock(ctx context.Context, root [32]byte) bool RecentBlockSlot(root [32]byte) (primitives.Slot, error) BlockBeingSynced([32]byte) bool @@ -56,7 +57,7 @@ type SlashingReceiver interface { // 1. Validate block, apply state transition and update checkpoints // 2. Apply fork choice to the processed block // 3. Save latest head info -func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error { +func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error { ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlock") defer span.End() // Return early if the block has been synced @@ -72,6 +73,10 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig if err != nil { return err } + rob, err := blocks.NewROBlockWithRoot(block, blockRoot) + if err != nil { + return err + } preState, err := s.getBlockPreState(ctx, blockCopy.Block()) if err != nil { @@ -106,10 +111,15 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig if err := eg.Wait(); err != nil { return err } - daStartTime := time.Now() - if err := s.isDataAvailable(ctx, blockRoot, blockCopy); err != nil { - return errors.Wrap(err, "could not validate blob data availability") + if avs != nil { + if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), rob); err != nil { + return errors.Wrap(err, "could not validate blob data availability (AvailabilityStore.IsDataAvailable)") + } + } else { + if err := s.isDataAvailable(ctx, blockRoot, blockCopy); err != nil { + return errors.Wrap(err, "could not validate blob data availability") + } } daWaitedTime := time.Since(daStartTime) @@ -203,7 +213,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig // ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning // the state, performing batch verification of all collected signatures and then performing the appropriate // actions for a block post-transition. -func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error { +func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error { ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch") defer span.End() @@ -211,7 +221,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock defer s.cfg.ForkChoiceStore.Unlock() // Apply state transition on the incoming newly received block batches, one by one. - if err := s.onBlockBatch(ctx, blocks); err != nil { + if err := s.onBlockBatch(ctx, blocks, avs); err != nil { err := errors.Wrap(err, "could not process block in batch") tracing.AnnotateError(span, err) return err diff --git a/beacon-chain/blockchain/receive_block_test.go b/beacon-chain/blockchain/receive_block_test.go index 5f135362243e..1cfccca74ac4 100644 --- a/beacon-chain/blockchain/receive_block_test.go +++ b/beacon-chain/blockchain/receive_block_test.go @@ -8,6 +8,7 @@ import ( blockchainTesting "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -146,7 +147,7 @@ func TestService_ReceiveBlock(t *testing.T) { require.NoError(t, err) wsb, err := blocks.NewSignedBeaconBlock(tt.args.block) require.NoError(t, err) - err = s.ReceiveBlock(ctx, wsb, root) + err = s.ReceiveBlock(ctx, wsb, root, nil) if tt.wantedErr != "" { assert.ErrorContains(t, tt.wantedErr, err) } else { @@ -179,7 +180,7 @@ func TestService_ReceiveBlockUpdateHead(t *testing.T) { go func() { wsb, err := blocks.NewSignedBeaconBlock(b) require.NoError(t, err) - require.NoError(t, s.ReceiveBlock(ctx, wsb, root)) + require.NoError(t, s.ReceiveBlock(ctx, wsb, root, nil)) wg.Done() }() wg.Wait() @@ -243,7 +244,7 @@ func TestService_ReceiveBlockBatch(t *testing.T) { require.NoError(t, err) rwsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb}) + err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb}, &das.MockAvailabilityStore{}) if tt.wantedErr != "" { assert.ErrorContains(t, tt.wantedErr, err) } else { diff --git a/beacon-chain/blockchain/testing/BUILD.bazel b/beacon-chain/blockchain/testing/BUILD.bazel index a4d0345e7dae..f461160748c9 100644 --- a/beacon-chain/blockchain/testing/BUILD.bazel +++ b/beacon-chain/blockchain/testing/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/forkchoice:go_default_library", "//beacon-chain/state:go_default_library", diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index c47cf5e057a0..c9b21ce43a71 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -16,6 +16,7 @@ import ( opfeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" @@ -208,7 +209,7 @@ func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interf } // ReceiveBlockBatch processes blocks in batches from initial-sync. -func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock) error { +func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock, _ das.AvailabilityStore) error { if s.State == nil { return ErrNilState } @@ -238,7 +239,7 @@ func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBl } // ReceiveBlock mocks ReceiveBlock method in chain service. -func (s *ChainService) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte) error { +func (s *ChainService) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, _ [32]byte, _ das.AvailabilityStore) error { if s.ReceiveBlockMockErr != nil { return s.ReceiveBlockMockErr } @@ -320,7 +321,7 @@ func (s *ChainService) PreviousJustifiedCheckpt() *ethpb.Checkpoint { } // ReceiveAttestation mocks ReceiveAttestation method in chain service. -func (_ *ChainService) ReceiveAttestation(_ context.Context, _ *ethpb.Attestation) error { +func (*ChainService) ReceiveAttestation(_ context.Context, _ *ethpb.Attestation) error { return nil } @@ -400,12 +401,12 @@ func (s *ChainService) RecentBlockSlot([32]byte) (primitives.Slot, error) { } // HeadGenesisValidatorsRoot mocks HeadGenesisValidatorsRoot method in chain service. -func (_ *ChainService) HeadGenesisValidatorsRoot() [32]byte { +func (*ChainService) HeadGenesisValidatorsRoot() [32]byte { return [32]byte{} } // VerifyLmdFfgConsistency mocks VerifyLmdFfgConsistency and always returns nil. -func (_ *ChainService) VerifyLmdFfgConsistency(_ context.Context, a *ethpb.Attestation) error { +func (*ChainService) VerifyLmdFfgConsistency(_ context.Context, a *ethpb.Attestation) error { if !bytes.Equal(a.Data.BeaconBlockRoot, a.Data.Target.Root) { return errors.New("LMD and FFG miss matched") } @@ -413,7 +414,7 @@ func (_ *ChainService) VerifyLmdFfgConsistency(_ context.Context, a *ethpb.Attes } // ChainHeads mocks ChainHeads and always return nil. -func (_ *ChainService) ChainHeads() ([][32]byte, []primitives.Slot) { +func (*ChainService) ChainHeads() ([][32]byte, []primitives.Slot) { return [][32]byte{ bytesutil.ToBytes32(bytesutil.PadTo([]byte("foo"), 32)), bytesutil.ToBytes32(bytesutil.PadTo([]byte("bar"), 32)), @@ -422,7 +423,7 @@ func (_ *ChainService) ChainHeads() ([][32]byte, []primitives.Slot) { } // HeadPublicKeyToValidatorIndex mocks HeadPublicKeyToValidatorIndex and always return 0 and true. -func (_ *ChainService) HeadPublicKeyToValidatorIndex(_ [fieldparams.BLSPubkeyLength]byte) (primitives.ValidatorIndex, bool) { +func (*ChainService) HeadPublicKeyToValidatorIndex(_ [fieldparams.BLSPubkeyLength]byte) (primitives.ValidatorIndex, bool) { return 0, true } @@ -486,7 +487,7 @@ func (s *ChainService) UpdateHead(ctx context.Context, slot primitives.Slot) { } // ReceiveAttesterSlashing mocks the same method in the chain service. -func (s *ChainService) ReceiveAttesterSlashing(context.Context, *ethpb.AttesterSlashing) {} +func (*ChainService) ReceiveAttesterSlashing(context.Context, *ethpb.AttesterSlashing) {} // IsFinalized mocks the same method in the chain service. func (s *ChainService) IsFinalized(_ context.Context, blockRoot [32]byte) bool { @@ -599,12 +600,12 @@ func (s *ChainService) ProposerBoost() [32]byte { } // FinalizedBlockHash mocks the same method in the chain service -func (s *ChainService) FinalizedBlockHash() [32]byte { +func (*ChainService) FinalizedBlockHash() [32]byte { return [32]byte{} } // UnrealizedJustifiedPayloadBlockHash mocks the same method in the chain service -func (s *ChainService) UnrealizedJustifiedPayloadBlockHash() [32]byte { +func (*ChainService) UnrealizedJustifiedPayloadBlockHash() [32]byte { return [32]byte{} } diff --git a/beacon-chain/das/BUILD.bazel b/beacon-chain/das/BUILD.bazel new file mode 100644 index 000000000000..8666d00439a2 --- /dev/null +++ b/beacon-chain/das/BUILD.bazel @@ -0,0 +1,49 @@ +load("@prysm//tools/go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "availability.go", + "cache.go", + "iface.go", + "mock.go", + ], + importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/das", + visibility = ["//visibility:public"], + deps = [ + "//beacon-chain/db/filesystem:go_default_library", + "//beacon-chain/verification:go_default_library", + "//config/fieldparams:go_default_library", + "//config/params:go_default_library", + "//consensus-types/blocks:go_default_library", + "//consensus-types/primitives:go_default_library", + "//runtime/logging:go_default_library", + "//runtime/version:go_default_library", + "//time/slots:go_default_library", + "@com_github_pkg_errors//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "availability_test.go", + "cache_test.go", + ], + embed = [":go_default_library"], + deps = [ + "//beacon-chain/db/filesystem:go_default_library", + "//beacon-chain/verification:go_default_library", + "//config/fieldparams:go_default_library", + "//config/params:go_default_library", + "//consensus-types/blocks:go_default_library", + "//consensus-types/primitives:go_default_library", + "//encoding/bytesutil:go_default_library", + "//proto/prysm/v1alpha1:go_default_library", + "//testing/require:go_default_library", + "//testing/util:go_default_library", + "//time/slots:go_default_library", + "@com_github_pkg_errors//:go_default_library", + ], +) diff --git a/beacon-chain/das/availability.go b/beacon-chain/das/availability.go new file mode 100644 index 000000000000..0f0e23077238 --- /dev/null +++ b/beacon-chain/das/availability.go @@ -0,0 +1,149 @@ +package das + +import ( + "context" + "fmt" + + errors "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/runtime/logging" + "github.com/prysmaticlabs/prysm/v4/runtime/version" + "github.com/prysmaticlabs/prysm/v4/time/slots" + log "github.com/sirupsen/logrus" +) + +var ( + errMixedRoots = errors.New("BlobSidecars must all be for the same block") +) + +// LazilyPersistentStore is an implementation of AvailabilityStore to be used when batch syncing. +// This implementation will hold any blobs passed to Persist until the IsDataAvailable is called for their +// block, at which time they will undergo full verification and be saved to the disk. +type LazilyPersistentStore struct { + store *filesystem.BlobStorage + cache *cache + verifier BlobBatchVerifier +} + +var _ AvailabilityStore = &LazilyPersistentStore{} + +// BlobBatchVerifier enables LazyAvailabilityStore to manage the verification process +// going from ROBlob->VerifiedROBlob, while avoiding the decision of which individual verifications +// to run and in what order. Since LazilyPersistentStore always tries to verify and save blobs only when +// they are all available, the interface takes a slice of blobs, enabling the implementation to optimize +// batch verification. +type BlobBatchVerifier interface { + VerifiedROBlobs(ctx context.Context, blk blocks.ROBlock, sc []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) +} + +// NewLazilyPersistentStore creates a new LazilyPersistentStore. This constructor should always be used +// when creating a LazilyPersistentStore because it needs to initialize the cache under the hood. +func NewLazilyPersistentStore(store *filesystem.BlobStorage, verifier BlobBatchVerifier) *LazilyPersistentStore { + return &LazilyPersistentStore{ + store: store, + cache: newCache(), + verifier: verifier, + } +} + +// Persist adds blobs to the working blob cache. Blobs stored in this cache will be persisted +// for at least as long as the node is running. Once IsDataAvailable succeeds, all blobs referenced +// by the given block are guaranteed to be persisted for the remainder of the retention period. +func (s *LazilyPersistentStore) Persist(current primitives.Slot, sc ...blocks.ROBlob) error { + if len(sc) == 0 { + return nil + } + if len(sc) > 1 { + first := sc[0].BlockRoot() + for i := 1; i < len(sc); i++ { + if first != sc[i].BlockRoot() { + return errMixedRoots + } + } + } + if !params.WithinDAPeriod(slots.ToEpoch(sc[0].Slot()), slots.ToEpoch(current)) { + return nil + } + key := keyFromSidecar(sc[0]) + entry := s.cache.ensure(key) + for i := range sc { + if err := entry.stash(&sc[i]); err != nil { + return err + } + } + return nil +} + +// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified. +// BlobSidecars already in the db are assumed to have been previously verified against the block. +func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error { + blockCommitments, err := commitmentsToCheck(b, current) + if err != nil { + return errors.Wrapf(err, "could check data availability for block %#x", b.Root()) + } + // Return early for blocks that are pre-deneb or which do not have any commitments. + if blockCommitments.count() == 0 { + return nil + } + + key := keyFromBlock(b) + entry := s.cache.ensure(key) + defer s.cache.delete(key) + root := b.Root() + // Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent. + // We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather + // ignore their response and decrease their peer score. + sidecars, err := entry.filter(root, blockCommitments) + if err != nil { + return errors.Wrap(err, "incomplete BlobSidecar batch") + } + // Do thorough verifications of each BlobSidecar for the block. + // Same as above, we don't save BlobSidecars if there are any problems with the batch. + vscs, err := s.verifier.VerifiedROBlobs(ctx, b, sidecars) + if err != nil { + var me verification.VerificationMultiError + ok := errors.As(err, &me) + if ok { + fails := me.Failures() + lf := make(log.Fields, len(fails)) + for i := range fails { + lf[fmt.Sprintf("fail_%d", i)] = fails[i].Error() + } + log.WithFields(lf).WithFields(logging.BlockFieldsFromBlob(sidecars[0])). + Debug("invalid BlobSidecars received") + } + return errors.Wrapf(err, "invalid BlobSidecars received for block %#x", root) + } + // Ensure that each BlobSidecar is written to disk. + for i := range vscs { + if err := s.store.Save(vscs[i]); err != nil { + return errors.Wrapf(err, "failed to save BlobSidecar index %d for block %#x", vscs[i].Index, root) + } + } + // All BlobSidecars are persisted - da check succeeds. + return nil +} + +func commitmentsToCheck(b blocks.ROBlock, current primitives.Slot) (safeCommitmentArray, error) { + var ar safeCommitmentArray + if b.Version() < version.Deneb { + return ar, nil + } + // We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS + if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) { + return ar, nil + } + kc, err := b.Block().Body().BlobKzgCommitments() + if err != nil { + return ar, err + } + if len(kc) > len(ar) { + return ar, errIndexOutOfBounds + } + copy(ar[:], kc) + return ar, nil +} diff --git a/beacon-chain/das/availability_test.go b/beacon-chain/das/availability_test.go new file mode 100644 index 000000000000..203816ce07ff --- /dev/null +++ b/beacon-chain/das/availability_test.go @@ -0,0 +1,214 @@ +package das + +import ( + "bytes" + "context" + "testing" + + errors "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/testing/util" + "github.com/prysmaticlabs/prysm/v4/time/slots" +) + +func Test_commitmentsToCheck(t *testing.T) { + windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) + require.NoError(t, err) + commits := [][]byte{ + bytesutil.PadTo([]byte("a"), 48), + bytesutil.PadTo([]byte("b"), 48), + bytesutil.PadTo([]byte("c"), 48), + bytesutil.PadTo([]byte("d"), 48), + } + cases := []struct { + name string + commits [][]byte + block func(*testing.T) blocks.ROBlock + slot primitives.Slot + err error + }{ + { + name: "pre deneb", + block: func(t *testing.T) blocks.ROBlock { + bb := util.NewBeaconBlockBellatrix() + sb, err := blocks.NewSignedBeaconBlock(bb) + require.NoError(t, err) + rb, err := blocks.NewROBlock(sb) + require.NoError(t, err) + return rb + }, + }, + { + name: "commitments within da", + block: func(t *testing.T) blocks.ROBlock { + d := util.NewBeaconBlockDeneb() + d.Block.Body.BlobKzgCommitments = commits + d.Block.Slot = 100 + sb, err := blocks.NewSignedBeaconBlock(d) + require.NoError(t, err) + rb, err := blocks.NewROBlock(sb) + require.NoError(t, err) + return rb + }, + commits: commits, + slot: 100, + }, + { + name: "commitments outside da", + block: func(t *testing.T) blocks.ROBlock { + d := util.NewBeaconBlockDeneb() + // block is from slot 0, "current slot" is window size +1 (so outside the window) + d.Block.Body.BlobKzgCommitments = commits + sb, err := blocks.NewSignedBeaconBlock(d) + require.NoError(t, err) + rb, err := blocks.NewROBlock(sb) + require.NoError(t, err) + return rb + }, + slot: windowSlots + 1, + }, + { + name: "excessive commitments", + block: func(t *testing.T) blocks.ROBlock { + d := util.NewBeaconBlockDeneb() + d.Block.Slot = 100 + // block is from slot 0, "current slot" is window size +1 (so outside the window) + d.Block.Body.BlobKzgCommitments = commits + // Double the number of commitments, assert that this is over the limit + d.Block.Body.BlobKzgCommitments = append(commits, d.Block.Body.BlobKzgCommitments...) + sb, err := blocks.NewSignedBeaconBlock(d) + require.NoError(t, err) + rb, err := blocks.NewROBlock(sb) + require.NoError(t, err) + c, err := rb.Block().Body().BlobKzgCommitments() + require.NoError(t, err) + require.Equal(t, true, len(c) > fieldparams.MaxBlobsPerBlock) + return rb + }, + slot: windowSlots + 1, + err: errIndexOutOfBounds, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + b := c.block(t) + co, err := commitmentsToCheck(b, c.slot) + if c.err != nil { + require.ErrorIs(t, err, c.err) + } else { + require.NoError(t, err) + } + require.Equal(t, len(c.commits), co.count()) + for i := 0; i < len(c.commits); i++ { + require.Equal(t, true, bytes.Equal(c.commits[i], co[i])) + } + }) + } +} + +func daAlwaysSucceeds(_ [][]byte, _ []*ethpb.BlobSidecar) error { + return nil +} + +type mockDA struct { + t *testing.T + scs []blocks.ROBlob + err error +} + +func TestLazilyPersistent_Missing(t *testing.T) { + ctx := context.Background() + store := filesystem.NewEphemeralBlobStorage(t) + + blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3) + + mbv := &mockBlobBatchVerifier{t: t, scs: scs} + as := NewLazilyPersistentStore(store, mbv) + + // Only one commitment persisted, should return error with other indices + require.NoError(t, as.Persist(1, scs[2])) + err := as.IsDataAvailable(ctx, 1, blk) + require.ErrorIs(t, err, errMissingSidecar) + + // All but one persisted, return missing idx + require.NoError(t, as.Persist(1, scs[0])) + err = as.IsDataAvailable(ctx, 1, blk) + require.ErrorIs(t, err, errMissingSidecar) + + // All persisted, return nil + require.NoError(t, as.Persist(1, scs...)) + + require.NoError(t, as.IsDataAvailable(ctx, 1, blk)) +} + +func TestLazilyPersistent_Mismatch(t *testing.T) { + ctx := context.Background() + store := filesystem.NewEphemeralBlobStorage(t) + + blk, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 3) + + mbv := &mockBlobBatchVerifier{t: t, err: errors.New("kzg check should not run")} + scs[0].KzgCommitment = bytesutil.PadTo([]byte("nope"), 48) + as := NewLazilyPersistentStore(store, mbv) + + // Only one commitment persisted, should return error with other indices + require.NoError(t, as.Persist(1, scs[0])) + err := as.IsDataAvailable(ctx, 1, blk) + require.NotNil(t, err) + require.ErrorIs(t, err, errCommitmentMismatch) +} + +func TestLazyPersistOnceCommitted(t *testing.T) { + _, scs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 6) + as := NewLazilyPersistentStore(filesystem.NewEphemeralBlobStorage(t), &mockBlobBatchVerifier{}) + // stashes as expected + require.NoError(t, as.Persist(1, scs...)) + // ignores duplicates + require.ErrorIs(t, as.Persist(1, scs...), ErrDuplicateSidecar) + + // ignores index out of bound + scs[0].Index = 6 + require.ErrorIs(t, as.Persist(1, scs[0]), errIndexOutOfBounds) + + _, more := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 4) + // ignores sidecars before the retention period + slotOOB, err := slots.EpochStart(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest) + require.NoError(t, err) + require.NoError(t, as.Persist(32+slotOOB, more[0])) + + // doesn't ignore new sidecars with a different block root + require.NoError(t, as.Persist(1, more...)) +} + +type mockBlobBatchVerifier struct { + t *testing.T + scs []blocks.ROBlob + err error + verified map[[32]byte]primitives.Slot +} + +var _ BlobBatchVerifier = &mockBlobBatchVerifier{} + +func (m *mockBlobBatchVerifier) VerifiedROBlobs(_ context.Context, _ blocks.ROBlock, scs []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) { + require.Equal(m.t, len(scs), len(m.scs)) + for i := range m.scs { + require.Equal(m.t, m.scs[i], scs[i]) + } + vscs := verification.FakeVerifySliceForTest(m.t, scs) + return vscs, m.err +} + +func (m *mockBlobBatchVerifier) MarkVerified(root [32]byte, slot primitives.Slot) { + if m.verified == nil { + m.verified = make(map[[32]byte]primitives.Slot) + } + m.verified[root] = slot +} diff --git a/beacon-chain/das/cache.go b/beacon-chain/das/cache.go new file mode 100644 index 000000000000..7aef135463c6 --- /dev/null +++ b/beacon-chain/das/cache.go @@ -0,0 +1,117 @@ +package das + +import ( + "bytes" + + "github.com/pkg/errors" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +) + +var ( + ErrDuplicateSidecar = errors.New("duplicate sidecar stashed in AvailabilityStore") + errIndexOutOfBounds = errors.New("sidecar.index > MAX_BLOBS_PER_BLOCK") + errCommitmentMismatch = errors.New("KzgCommitment of sidecar in cache did not match block commitment") + errMissingSidecar = errors.New("no sidecar in cache for block commitment") +) + +// cacheKey includes the slot so that we can easily iterate through the cache and compare +// slots for eviction purposes. Whether the input is the block or the sidecar, we always have +// the root+slot when interacting with the cache, so it isn't an inconvenience to use both. +type cacheKey struct { + slot primitives.Slot + root [32]byte +} + +type cache struct { + entries map[cacheKey]*cacheEntry +} + +func newCache() *cache { + return &cache{entries: make(map[cacheKey]*cacheEntry)} +} + +// keyFromSidecar is a convenience method for constructing a cacheKey from a BlobSidecar value. +func keyFromSidecar(sc blocks.ROBlob) cacheKey { + return cacheKey{slot: sc.Slot(), root: sc.BlockRoot()} +} + +// keyFromBlock is a convenience method for constructing a cacheKey from a ROBlock value. +func keyFromBlock(b blocks.ROBlock) cacheKey { + return cacheKey{slot: b.Block().Slot(), root: b.Root()} +} + +// ensure returns the entry for the given key, creating it if it isn't already present. +func (c *cache) ensure(key cacheKey) *cacheEntry { + e, ok := c.entries[key] + if !ok { + e = &cacheEntry{} + c.entries[key] = e + } + return e +} + +// delete removes the cache entry from the cache. +func (c *cache) delete(key cacheKey) { + delete(c.entries, key) +} + +// cacheEntry holds a fixed-length cache of BlobSidecars. +type cacheEntry struct { + scs [fieldparams.MaxBlobsPerBlock]*blocks.ROBlob +} + +// stash adds an item to the in-memory cache of BlobSidecars. +// Only the first BlobSidecar of a given Index will be kept in the cache. +// stash will return an error if the given blob is already in the cache, or if the Index is out of bounds. +func (e *cacheEntry) stash(sc *blocks.ROBlob) error { + if sc.Index >= fieldparams.MaxBlobsPerBlock { + return errors.Wrapf(errIndexOutOfBounds, "index=%d", sc.Index) + } + if e.scs[sc.Index] != nil { + return errors.Wrapf(ErrDuplicateSidecar, "root=%#x, index=%d, commitment=%#x", sc.BlockRoot(), sc.Index, sc.KzgCommitment) + } + e.scs[sc.Index] = sc + return nil +} + +// filter evicts sidecars that are not committed to by the block and returns custom +// errors if the cache is missing any of the commitments, or if the commitments in +// the cache do not match those found in the block. If err is nil, then all expected +// commitments were found in the cache and the sidecar slice return value can be used +// to perform a DA check against the cached sidecars. +func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROBlob, error) { + scs := make([]blocks.ROBlob, kc.count()) + for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ { + if kc[i] == nil { + if e.scs[i] != nil { + return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment) + } + continue + } + + if e.scs[i] == nil { + return nil, errors.Wrapf(errMissingSidecar, "root=%#x, index=%#x", root, i) + } + if !bytes.Equal(kc[i], e.scs[i].KzgCommitment) { + return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.scs[i].KzgCommitment, kc[i]) + } + scs[i] = *e.scs[i] + } + + return scs, nil +} + +// safeCommitemntArray is a fixed size array of commitment byte slices. This is helpful for avoiding +// gratuitous bounds checks. +type safeCommitmentArray [fieldparams.MaxBlobsPerBlock][]byte + +func (s safeCommitmentArray) count() int { + for i := range s { + if s[i] == nil { + return i + } + } + return fieldparams.MaxBlobsPerBlock +} diff --git a/beacon-chain/das/cache_test.go b/beacon-chain/das/cache_test.go new file mode 100644 index 000000000000..afec2ad7c19c --- /dev/null +++ b/beacon-chain/das/cache_test.go @@ -0,0 +1,25 @@ +package das + +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +func TestCacheEnsureDelete(t *testing.T) { + c := newCache() + require.Equal(t, 0, len(c.entries)) + root := bytesutil.ToBytes32([]byte("root")) + slot := primitives.Slot(1234) + k := cacheKey{root: root, slot: slot} + entry := c.ensure(k) + require.Equal(t, 1, len(c.entries)) + require.Equal(t, c.entries[k], entry) + + c.delete(k) + require.Equal(t, 0, len(c.entries)) + var nilEntry *cacheEntry + require.Equal(t, nilEntry, c.entries[k]) +} diff --git a/beacon-chain/das/iface.go b/beacon-chain/das/iface.go new file mode 100644 index 000000000000..1e15f4778a02 --- /dev/null +++ b/beacon-chain/das/iface.go @@ -0,0 +1,19 @@ +package das + +import ( + "context" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +) + +// AvailabilityStore describes a component that can verify and save sidecars for a given block, and confirm previously +// verified and saved sidecars. +// Persist guarantees that the sidecar will be available to perform a DA check +// for the life of the beacon node process. +// IsDataAvailable guarantees that all blobs committed to in the block have been +// durably persisted before returning a non-error value. +type AvailabilityStore interface { + IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error + Persist(current primitives.Slot, sc ...blocks.ROBlob) error +} diff --git a/beacon-chain/das/mock.go b/beacon-chain/das/mock.go new file mode 100644 index 000000000000..899af9d1cf7f --- /dev/null +++ b/beacon-chain/das/mock.go @@ -0,0 +1,32 @@ +package das + +import ( + "context" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +) + +// MockAvailabilityStore is an implementation of AvailabilityStore that can be used by other packages in tests. +type MockAvailabilityStore struct { + VerifyAvailabilityCallback func(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error + PersistBlobsCallback func(current primitives.Slot, sc ...blocks.ROBlob) error +} + +var _ AvailabilityStore = &MockAvailabilityStore{} + +// IsDataAvailable satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests. +func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error { + if m.VerifyAvailabilityCallback != nil { + return m.VerifyAvailabilityCallback(ctx, current, b) + } + return nil +} + +// Persist satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests. +func (m *MockAvailabilityStore) Persist(current primitives.Slot, sc ...blocks.ROBlob) error { + if m.PersistBlobsCallback != nil { + return m.PersistBlobsCallback(current, sc...) + } + return nil +} diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 227c74b4e51c..8d24039cd8e2 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -116,6 +116,7 @@ type BeaconNode struct { initialSyncComplete chan struct{} BlobStorage *filesystem.BlobStorage blobRetentionEpochs primitives.Epoch + verifyInitWaiter *verification.InitializerWaiter } // New creates a new node instance, sets up configuration options, and registers @@ -228,6 +229,8 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco return nil, err } + beacon.verifyInitWaiter = verification.NewInitializerWaiter( + beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen) if err := beacon.BlobStorage.Initialize(); err != nil { return nil, fmt.Errorf("failed to initialize blob storage: %w", err) } @@ -742,7 +745,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}) erro regularsync.WithInitialSyncComplete(initialSyncComplete), regularsync.WithStateNotifier(b), regularsync.WithBlobStorage(b.BlobStorage), - regularsync.WithVerifierWaiter(verification.NewInitializerWaiter(b.clockWaiter, forkchoice.NewROForkChoice(b.forkChoicer), b.stateGen)), + regularsync.WithVerifierWaiter(b.verifyInitWaiter), ) return b.services.RegisterService(rs) } @@ -753,6 +756,9 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error { return err } + opts := []initialsync.Option{ + initialsync.WithVerifierWaiter(b.verifyInitWaiter), + } is := initialsync.NewService(b.ctx, &initialsync.Config{ DB: b.db, Chain: chainService, @@ -762,7 +768,7 @@ func (b *BeaconNode) registerInitialSyncService(complete chan struct{}) error { ClockWaiter: b.clockWaiter, InitialSyncComplete: complete, BlobStorage: b.BlobStorage, - }) + }, opts...) return b.services.RegisterService(is) } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index 744151d26edd..384f0d4fb045 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -308,7 +308,7 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si Type: blockfeed.ReceivedBlock, Data: &blockfeed.ReceivedBlockData{SignedBlock: block}, }) - return vs.BlockReceiver.ReceiveBlock(ctx, block, root) + return vs.BlockReceiver.ReceiveBlock(ctx, block, root, nil) } // broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars. diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index cf88e4c5584a..bfe8a44ed0cc 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "fuzz_exports.go", # keep "log.go", "metrics.go", - "mock_blob_verifier.go", "options.go", "pending_attestations_queue.go", "pending_blocks_queue.go", diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 193edd52e68d..e598432e8e8e 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -12,15 +12,18 @@ go_library( "log.go", "round_robin.go", "service.go", + "verification.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync", visibility = ["//beacon-chain:__subpackages__"], deps = [ "//async/abool:go_default_library", "//beacon-chain/blockchain:go_default_library", + "//beacon-chain/blockchain/kzg:go_default_library", "//beacon-chain/core/feed/block:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/transition:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/p2p:go_default_library", @@ -38,6 +41,7 @@ go_library( "//consensus-types/primitives:go_default_library", "//container/leaky-bucket:go_default_library", "//crypto/rand:go_default_library", + "//encoding/bytesutil:go_default_library", "//math:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//runtime:go_default_library", @@ -69,6 +73,7 @@ go_test( deps = [ "//async/abool:go_default_library", "//beacon-chain/blockchain/testing:go_default_library", + "//beacon-chain/das:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/testing:go_default_library", @@ -80,6 +85,7 @@ go_test( "//beacon-chain/startup:go_default_library", "//beacon-chain/sync:go_default_library", "//beacon-chain/sync/verify:go_default_library", + "//beacon-chain/verification:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//config/features:go_default_library", "//config/params:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 20a391f6a5ef..e42452e6c4b1 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -120,7 +120,7 @@ type fetchRequestResponse struct { pid peer.ID start primitives.Slot count uint64 - bwb []blocks2.BlockWithVerifiedBlobs + bwb []blocks2.BlockWithROBlobs err error } @@ -263,7 +263,7 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot response := &fetchRequestResponse{ start: start, count: count, - bwb: []blocks2.BlockWithVerifiedBlobs{}, + bwb: []blocks2.BlockWithROBlobs{}, err: nil, } @@ -304,7 +304,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer( ctx context.Context, start primitives.Slot, count uint64, peers []peer.ID, -) ([]blocks2.BlockWithVerifiedBlobs, peer.ID, error) { +) ([]blocks2.BlockWithROBlobs, peer.ID, error) { ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer") defer span.End() @@ -332,20 +332,20 @@ func (f *blocksFetcher) fetchBlocksFromPeer( return nil, "", errNoPeersAvailable } -func sortedBlockWithVerifiedBlobSlice(blocks []interfaces.ReadOnlySignedBeaconBlock) ([]blocks2.BlockWithVerifiedBlobs, error) { - rb := make([]blocks2.BlockWithVerifiedBlobs, len(blocks)) +func sortedBlockWithVerifiedBlobSlice(blocks []interfaces.ReadOnlySignedBeaconBlock) ([]blocks2.BlockWithROBlobs, error) { + rb := make([]blocks2.BlockWithROBlobs, len(blocks)) for i, b := range blocks { ro, err := blocks2.NewROBlock(b) if err != nil { return nil, err } - rb[i] = blocks2.BlockWithVerifiedBlobs{Block: ro} + rb[i] = blocks2.BlockWithROBlobs{Block: ro} } - sort.Sort(blocks2.BlockWithVerifiedBlobsSlice(rb)) + sort.Sort(blocks2.BlockWithROBlobsSlice(rb)) return rb, nil } -func blobRequest(bwb []blocks2.BlockWithVerifiedBlobs, blobWindowStart primitives.Slot) *p2ppb.BlobSidecarsByRangeRequest { +func blobRequest(bwb []blocks2.BlockWithROBlobs, blobWindowStart primitives.Slot) *p2ppb.BlobSidecarsByRangeRequest { if len(bwb) == 0 { return nil } @@ -360,7 +360,7 @@ func blobRequest(bwb []blocks2.BlockWithVerifiedBlobs, blobWindowStart primitive } } -func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWithVerifiedBlobs) *primitives.Slot { +func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWithROBlobs) *primitives.Slot { if len(bwb) == 0 { return nil } @@ -398,7 +398,7 @@ func sortBlobs(blobs []blocks.ROBlob) []blocks.ROBlob { var errBlobVerification = errors.New("peer unable to serve aligned BlobSidecarsByRange and BeaconBlockSidecarsByRange responses") var errMissingBlobsForBlockCommitments = errors.Wrap(errBlobVerification, "blobs unavailable for processing block with kzg commitments") -func verifyAndPopulateBlobs(bwb []blocks2.BlockWithVerifiedBlobs, blobs []blocks.ROBlob, blobWindowStart primitives.Slot) ([]blocks2.BlockWithVerifiedBlobs, error) { +func verifyAndPopulateBlobs(bwb []blocks2.BlockWithROBlobs, blobs []blocks.ROBlob, blobWindowStart primitives.Slot) ([]blocks2.BlockWithROBlobs, error) { // Assumes bwb has already been sorted by sortedBlockWithVerifiedBlobSlice. blobs = sortBlobs(blobs) blobi := 0 @@ -450,7 +450,7 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e } // fetchBlobsFromPeer fetches blocks from a single randomly selected peer. -func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithVerifiedBlobs, pid peer.ID) ([]blocks2.BlockWithVerifiedBlobs, error) { +func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithROBlobs, pid peer.ID) ([]blocks2.BlockWithROBlobs, error) { ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer") defer span.End() if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index c25efbaf41ca..ec1ff406b3de 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -306,9 +306,9 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { fetcher.stop() }() - processFetchedBlocks := func() ([]blocks.BlockWithVerifiedBlobs, error) { + processFetchedBlocks := func() ([]blocks.BlockWithROBlobs, error) { defer cancel() - var unionRespBlocks []blocks.BlockWithVerifiedBlobs + var unionRespBlocks []blocks.BlockWithROBlobs for { select { @@ -347,7 +347,7 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { bwb, err := processFetchedBlocks() assert.NoError(t, err) - sort.Sort(blocks.BlockWithVerifiedBlobsSlice(bwb)) + sort.Sort(blocks.BlockWithROBlobsSlice(bwb)) ss := make([]primitives.Slot, len(bwb)) for i, b := range bwb { ss[i] = b.Block.Block().Slot() @@ -454,7 +454,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { } }() - var bwb []blocks.BlockWithVerifiedBlobs + var bwb []blocks.BlockWithROBlobs select { case <-ctx.Done(): t.Error(ctx.Err()) @@ -1015,7 +1015,7 @@ func TestLowestSlotNeedsBlob(t *testing.T) { func TestBlobRequest(t *testing.T) { var nilReq *ethpb.BlobSidecarsByRangeRequest // no blocks - req := blobRequest([]blocks.BlockWithVerifiedBlobs{}, 0) + req := blobRequest([]blocks.BlockWithROBlobs{}, 0) require.Equal(t, nilReq, req) blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) @@ -1047,7 +1047,7 @@ func TestBlobRequest(t *testing.T) { require.Equal(t, len(allAfter), int(req.Count)) } -func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithVerifiedBlobs, []blocks.ROBlob) { +func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithROBlobs, []blocks.ROBlob) { blks, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, nblocks) sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) for i := range blks { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index b80c54635057..57c63cc33476 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -23,7 +23,7 @@ import ( // either in DB or initial sync cache. type forkData struct { peer peer.ID - bwb []blocks.BlockWithVerifiedBlobs + bwb []blocks.BlockWithROBlobs } // nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot. diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 628be0eb1744..242b9adfdace 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -92,7 +92,7 @@ type blocksQueue struct { // blocksQueueFetchedData is a data container that is returned from a queue on each step. type blocksQueueFetchedData struct { pid peer.ID - bwb []blocks.BlockWithVerifiedBlobs + bwb []blocks.BlockWithROBlobs } // newBlocksQueue creates initialized priority queue. diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 87610368bfdd..6154c102b4c3 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -263,7 +263,7 @@ func TestBlocksQueue_Loop(t *testing.T) { highestExpectedSlot: tt.highestExpectedSlot, }) assert.NoError(t, queue.start()) - processBlock := func(b blocks.BlockWithVerifiedBlobs) error { + processBlock := func(b blocks.BlockWithROBlobs) error { block := b.Block if !beaconDB.HasBlock(ctx, block.Block().ParentRoot()) { return fmt.Errorf("%w: %#x", errParentDoesNotExist, block.Block().ParentRoot()) @@ -272,10 +272,10 @@ func TestBlocksQueue_Loop(t *testing.T) { if err != nil { return err } - return mc.ReceiveBlock(ctx, block, root) + return mc.ReceiveBlock(ctx, block, root, nil) } - var blocks []blocks.BlockWithVerifiedBlobs + var blocks []blocks.BlockWithROBlobs for data := range queue.fetchedData { for _, b := range data.bwb { if err := processBlock(b); err != nil { @@ -538,7 +538,7 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { require.NoError(t, err) response := &fetchRequestResponse{ pid: "abc", - bwb: []blocks.BlockWithVerifiedBlobs{ + bwb: []blocks.BlockWithROBlobs{ {Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsb}}, {Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsbCopy}}, }, @@ -638,7 +638,7 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { queue.smm.machines[256].pid = pidDataParsed rwsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - queue.smm.machines[256].bwb = []blocks.BlockWithVerifiedBlobs{ + queue.smm.machines[256].bwb = []blocks.BlockWithROBlobs{ {Block: rwsb}, } @@ -672,7 +672,7 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { queue.smm.machines[320].pid = pidDataParsed rwsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - queue.smm.machines[320].bwb = []blocks.BlockWithVerifiedBlobs{ + queue.smm.machines[320].bwb = []blocks.BlockWithROBlobs{ {Block: rwsb}, } @@ -703,7 +703,7 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { queue.smm.machines[320].pid = pidDataParsed rwsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - queue.smm.machines[320].bwb = []blocks.BlockWithVerifiedBlobs{ + queue.smm.machines[320].bwb = []blocks.BlockWithROBlobs{ {Block: rwsb}, } diff --git a/beacon-chain/sync/initial-sync/fsm.go b/beacon-chain/sync/initial-sync/fsm.go index 9f3be7f561d3..ab6ec0db757e 100644 --- a/beacon-chain/sync/initial-sync/fsm.go +++ b/beacon-chain/sync/initial-sync/fsm.go @@ -46,7 +46,7 @@ type stateMachine struct { start primitives.Slot state stateID pid peer.ID - bwb []blocks.BlockWithVerifiedBlobs + bwb []blocks.BlockWithROBlobs updated time.Time } @@ -78,7 +78,7 @@ func (smm *stateMachineManager) addStateMachine(startSlot primitives.Slot) *stat smm: smm, start: startSlot, state: stateNew, - bwb: []blocks.BlockWithVerifiedBlobs{}, + bwb: []blocks.BlockWithROBlobs{}, updated: prysmTime.Now(), } smm.recalculateMachineAttribs() diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 5f313eab48f8..7d600aeddb7a 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -10,8 +10,8 @@ import ( "github.com/paulbellamy/ratecounter" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -25,10 +25,10 @@ const ( ) // blockReceiverFn defines block receiving function. -type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error +type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error // batchBlockReceiverFn defines batch receiving function. -type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock) error +type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error // Round Robin sync looks at the latest peer statuses and syncs up to the highest known epoch. // @@ -159,47 +159,46 @@ func (s *Service) processFetchedDataRegSync( ctx context.Context, genesis time.Time, startSlot primitives.Slot, data *blocksQueueFetchedData) { defer s.updatePeerScorerStats(data.pid, startSlot) - blockReceiver := s.cfg.Chain.ReceiveBlock - invalidBlocks := 0 - blksWithoutParentCount := 0 + bwb, err := validUnprocessed(ctx, data.bwb, s.cfg.Chain.HeadSlot(), s.isProcessedBlock) + if err != nil { + log.WithError(err).Debug("batch did not contain a valid sequence of unprocessed blocks") + return + } + if len(bwb) == 0 { + return + } + bv := newBlobBatchVerifier(s.newBlobVerifier) + avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv) + batchFields := logrus.Fields{ + "firstSlot": data.bwb[0].Block.Block().Slot(), + "firstUnprocessed": bwb[0].Block.Block().Slot(), + } for _, b := range data.bwb { - if len(b.Blobs) > 0 { - verified, err := verification.BlobSidecarSliceNoop(b.Blobs) - if err != nil { - log.WithField("root", b.Block.Root()).WithError(err).Error("blobs failed verification") - continue - } - for i := range verified { - if err := s.cfg.BlobStorage.Save(verified[i]); err != nil { - log.WithError(err).Warn("Failed to save blob sidecar") - } - } + if err := avs.Persist(s.clock.CurrentSlot(), b.Blobs...); err != nil { + log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Batch failure due to BlobSidecar issues") + return } - - if err := s.processBlock(ctx, genesis, b, blockReceiver); err != nil { + if err := s.processBlock(ctx, genesis, b, s.cfg.Chain.ReceiveBlock, avs); err != nil { switch { case errors.Is(err, errBlockAlreadyProcessed): - log.WithError(err).Debug("Block is not processed") - invalidBlocks++ + log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Skipping already processed block") + continue case errors.Is(err, errParentDoesNotExist): - blksWithoutParentCount++ - invalidBlocks++ + log.WithFields(batchFields).WithField("missingParent", fmt.Sprintf("%#x", b.Block.Block().ParentRoot())). + WithFields(syncFields(b.Block)).Debug("Could not process batch blocks due to missing parent") + return default: - log.WithError(err).Warn("Block is not processed") + log.WithError(err).WithFields(batchFields).WithFields(syncFields(b.Block)).Warn("Block processing failure") + return } - continue } } - if blksWithoutParentCount > 0 { - log.WithFields(logrus.Fields{ - "missingParent": fmt.Sprintf("%#x", data.bwb[0].Block.Block().ParentRoot()), - "firstSlot": data.bwb[0].Block.Block().Slot(), - "lastSlot": data.bwb[blksWithoutParentCount-1].Block.Block().Slot(), - }).Debug("Could not process batch blocks due to missing parent") - } - // Add more visible logging if all blocks cannot be processed. - if len(data.bwb) == invalidBlocks { - log.WithField("error", "Range had no valid blocks to process").Warn("Range is not processed") +} + +func syncFields(b blocks.ROBlock) logrus.Fields { + return logrus.Fields{ + "root": fmt.Sprintf("%#x", b.Root()), + "lastSlot": b.Block().Slot(), } } @@ -260,8 +259,9 @@ func (s *Service) logBatchSyncStatus(genesis time.Time, firstBlk blocks.ROBlock, func (s *Service) processBlock( ctx context.Context, genesis time.Time, - bwb blocks.BlockWithVerifiedBlobs, + bwb blocks.BlockWithROBlobs, blockReceiver blockReceiverFn, + avs das.AvailabilityStore, ) error { blk := bwb.Block blkRoot := blk.Root() @@ -273,12 +273,12 @@ func (s *Service) processBlock( if !s.cfg.Chain.HasBlock(ctx, blk.Block().ParentRoot()) { return fmt.Errorf("%w: (in processBlock, slot=%d) %#x", errParentDoesNotExist, blk.Block().Slot(), blk.Block().ParentRoot()) } - return blockReceiver(ctx, blk, blkRoot) + return blockReceiver(ctx, blk, blkRoot, avs) } type processedChecker func(context.Context, blocks.ROBlock) bool -func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithVerifiedBlobs, headSlot primitives.Slot, isProc processedChecker) ([]blocks.BlockWithVerifiedBlobs, error) { +func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithROBlobs, headSlot primitives.Slot, isProc processedChecker) ([]blocks.BlockWithROBlobs, error) { // use a pointer to avoid confusing the zero-value with the case where the first element is processed. var processed *int for i := range bwb { @@ -309,7 +309,7 @@ func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithVerifiedBlobs, } func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time, - bwb []blocks.BlockWithVerifiedBlobs, bFunc batchBlockReceiverFn) error { + bwb []blocks.BlockWithROBlobs, bFunc batchBlockReceiverFn) error { if len(bwb) == 0 { return errors.New("0 blocks provided into method") } @@ -328,32 +328,20 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time, return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)", errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot()) } + + bv := newBlobBatchVerifier(s.newBlobVerifier) + avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv) s.logBatchSyncStatus(genesis, first, len(bwb)) - blobCount := 0 for _, bb := range bwb { if len(bb.Blobs) == 0 { continue } - verified, err := verification.BlobSidecarSliceNoop(bb.Blobs) - if err != nil { - return errors.Wrapf(err, "blobs for root %#x failed verification", bb.Block.Root()) - } - for i := range verified { - if err := s.cfg.BlobStorage.Save(verified[i]); err != nil { - return errors.Wrapf(err, "failed to save blobs for block %#x", bb.Block.Root()) - } + if err := avs.Persist(s.clock.CurrentSlot(), bb.Blobs...); err != nil { + return err } - blobCount += len(bb.Blobs) - } - if blobCount > 0 { - log.WithFields(logrus.Fields{ - "startSlot": bwb[0].Block.Block().Slot(), - "endSlot": bwb[len(bwb)-1].Block.Block().Slot(), - "count": blobCount, - }).Info("Processed blob sidecars") } - return bFunc(ctx, blocks.BlockWithVerifiedBlobsSlice(bwb).ROBlocks()) + return bFunc(ctx, blocks.BlockWithROBlobsSlice(bwb).ROBlocks(), avs) } // updatePeerScorerStats adjusts monitored metrics for a peer. diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index dab0f3a42a4a..8db187847550 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -8,6 +8,7 @@ import ( "github.com/paulbellamy/ratecounter" "github.com/prysmaticlabs/prysm/v4/async/abool" mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" @@ -371,11 +372,11 @@ func TestService_processBlock(t *testing.T) { require.NoError(t, err) rowsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func( - ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error { - assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot)) + err = s.processBlock(ctx, genesis, blocks.BlockWithROBlobs{Block: rowsb}, func( + ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityStore) error { + assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot, nil)) return nil - }) + }, nil) assert.NoError(t, err) // Duplicate processing should trigger error. @@ -383,10 +384,10 @@ func TestService_processBlock(t *testing.T) { require.NoError(t, err) rowsb, err = blocks.NewROBlock(wsb) require.NoError(t, err) - err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func( - ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error { + err = s.processBlock(ctx, genesis, blocks.BlockWithROBlobs{Block: rowsb}, func( + ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityStore) error { return nil - }) + }, nil) assert.ErrorContains(t, errBlockAlreadyProcessed.Error(), err) // Continue normal processing, should proceed w/o errors. @@ -394,11 +395,11 @@ func TestService_processBlock(t *testing.T) { require.NoError(t, err) rowsb, err = blocks.NewROBlock(wsb) require.NoError(t, err) - err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func( - ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error { - assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot)) + err = s.processBlock(ctx, genesis, blocks.BlockWithROBlobs{Block: rowsb}, func( + ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, _ das.AvailabilityStore) error { + assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot, nil)) return nil - }) + }, nil) assert.NoError(t, err) assert.Equal(t, primitives.Slot(2), s.cfg.Chain.HeadSlot(), "Unexpected head slot") }) @@ -429,7 +430,7 @@ func TestService_processBlockBatch(t *testing.T) { genesis := makeGenesisTime(32) t.Run("process non-linear batch", func(t *testing.T) { - var batch []blocks.BlockWithVerifiedBlobs + var batch []blocks.BlockWithROBlobs currBlockRoot := genesisBlkRoot for i := primitives.Slot(1); i < 10; i++ { parentRoot := currBlockRoot @@ -443,11 +444,11 @@ func TestService_processBlockBatch(t *testing.T) { require.NoError(t, err) rowsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - batch = append(batch, blocks.BlockWithVerifiedBlobs{Block: rowsb}) + batch = append(batch, blocks.BlockWithROBlobs{Block: rowsb}) currBlockRoot = blk1Root } - var batch2 []blocks.BlockWithVerifiedBlobs + var batch2 []blocks.BlockWithROBlobs for i := primitives.Slot(10); i < 20; i++ { parentRoot := currBlockRoot blk1 := util.NewBeaconBlock() @@ -460,19 +461,19 @@ func TestService_processBlockBatch(t *testing.T) { require.NoError(t, err) rowsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - batch2 = append(batch2, blocks.BlockWithVerifiedBlobs{Block: rowsb}) + batch2 = append(batch2, blocks.BlockWithROBlobs{Block: rowsb}) currBlockRoot = blk1Root } - cbnormal := func(ctx context.Context, blks []blocks.ROBlock) error { - assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks)) + cbnormal := func(ctx context.Context, blks []blocks.ROBlock, avs das.AvailabilityStore) error { + assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, avs)) return nil } // Process block normally. err = s.processBatchedBlocks(ctx, genesis, batch, cbnormal) assert.NoError(t, err) - cbnil := func(ctx context.Context, blocks []blocks.ROBlock) error { + cbnil := func(ctx context.Context, blocks []blocks.ROBlock, _ das.AvailabilityStore) error { return nil } @@ -480,7 +481,7 @@ func TestService_processBlockBatch(t *testing.T) { err = s.processBatchedBlocks(ctx, genesis, batch, cbnil) assert.ErrorContains(t, "block is already processed", err) - var badBatch2 []blocks.BlockWithVerifiedBlobs + var badBatch2 []blocks.BlockWithROBlobs for i, b := range batch2 { // create a non-linear batch if i%3 == 0 && i != 0 { @@ -675,7 +676,7 @@ func TestService_ValidUnprocessed(t *testing.T) { require.NoError(t, err) util.SaveBlock(t, context.Background(), beaconDB, genesisBlk) - var batch []blocks.BlockWithVerifiedBlobs + var batch []blocks.BlockWithROBlobs currBlockRoot := genesisBlkRoot for i := primitives.Slot(1); i < 10; i++ { parentRoot := currBlockRoot @@ -689,7 +690,7 @@ func TestService_ValidUnprocessed(t *testing.T) { require.NoError(t, err) rowsb, err := blocks.NewROBlock(wsb) require.NoError(t, err) - batch = append(batch, blocks.BlockWithVerifiedBlobs{Block: rowsb}) + batch = append(batch, blocks.BlockWithROBlobs{Block: rowsb}) currBlockRoot = blk1Root } diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 68c4a7968451..540a7b4a4435 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -17,6 +17,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/runtime" @@ -47,19 +48,32 @@ type Config struct { // Service service. type Service struct { - cfg *Config - ctx context.Context - cancel context.CancelFunc - synced *abool.AtomicBool - chainStarted *abool.AtomicBool - counter *ratecounter.RateCounter - genesisChan chan time.Time - clock *startup.Clock + cfg *Config + ctx context.Context + cancel context.CancelFunc + synced *abool.AtomicBool + chainStarted *abool.AtomicBool + counter *ratecounter.RateCounter + genesisChan chan time.Time + clock *startup.Clock + verifierWaiter *verification.InitializerWaiter + newBlobVerifier verification.NewBlobVerifier +} + +// Option is a functional option for the initial-sync Service. +type Option func(*Service) + +// WithVerifierWaiter sets the verification.InitializerWaiter +// for the initial-sync Service. +func WithVerifierWaiter(viw *verification.InitializerWaiter) Option { + return func(s *Service) { + s.verifierWaiter = viw + } } // NewService configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. -func NewService(ctx context.Context, cfg *Config) *Service { +func NewService(ctx context.Context, cfg *Config, opts ...Option) *Service { ctx, cancel := context.WithCancel(ctx) s := &Service{ cfg: cfg, @@ -71,7 +85,9 @@ func NewService(ctx context.Context, cfg *Config) *Service { genesisChan: make(chan time.Time), clock: startup.NewClock(time.Unix(0, 0), [32]byte{}), // default clock to prevent panic } - + for _, o := range opts { + o(s) + } return s } @@ -86,6 +102,13 @@ func (s *Service) Start() { s.clock = clock log.Info("Received state initialized event") + v, err := s.verifierWaiter.WaitForInitializer(s.ctx) + if err != nil { + log.WithError(err).Error("Could not get verification initializer") + return + } + s.newBlobVerifier = newBlobVerifierFromInitializer(v) + gt := clock.GenesisTime() if gt.IsZero() { log.Debug("Exiting Initial Sync Service") diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index 0a6f3eb38d2a..027e0be858d9 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -13,6 +13,7 @@ import ( dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -158,6 +159,7 @@ func TestService_InitStartStop(t *testing.T) { StateNotifier: &mock.MockStateNotifier{}, InitialSyncComplete: make(chan struct{}), }) + s.verifierWaiter = verification.NewInitializerWaiter(gs, nil, nil) time.Sleep(500 * time.Millisecond) assert.NotNil(t, s) if tt.setGenesis != nil { @@ -200,6 +202,7 @@ func TestService_waitForStateInitialization(t *testing.T) { counter: ratecounter.NewRateCounter(counterSeconds * time.Second), genesisChan: make(chan time.Time), } + s.verifierWaiter = verification.NewInitializerWaiter(cs, nil, nil) return s, cs } diff --git a/beacon-chain/sync/initial-sync/verification.go b/beacon-chain/sync/initial-sync/verification.go new file mode 100644 index 000000000000..084d05fbe623 --- /dev/null +++ b/beacon-chain/sync/initial-sync/verification.go @@ -0,0 +1,99 @@ +package initialsync + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/kzg" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/das" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" +) + +var ( + // ErrBatchSignatureMismatch is returned by VerifiedROBlobs when any of the blobs in the batch have a signature + // which does not match the signature for the block with a corresponding root. + ErrBatchSignatureMismatch = errors.New("Sidecar block header signature does not match signed block") + // ErrBlockRootMismatch is returned by VerifiedROBlobs in the scenario where the root of the given signed block + // does not match the block header in one of the corresponding sidecars. + ErrBatchBlockRootMismatch = errors.New("Sidecar block header root does not match signed block") +) + +func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return ini.NewBlobVerifier(b, reqs) + } +} + +func newBlobBatchVerifier(newVerifier verification.NewBlobVerifier) *BlobBatchVerifier { + return &BlobBatchVerifier{ + verifyKzg: kzg.Verify, + newVerifier: newVerifier, + } +} + +type kzgVerifier func(b ...blocks.ROBlob) error + +// BlobBatchVerifier solves problems that come from verifying batches of blobs from RPC. +// First: we only update forkchoice after the entire batch has completed, so the n+1 elements in the batch +// won't be in forkchoice yet. +// Second: it is more efficient to batch some verifications, like kzg commitment verification. Batch adds a +// method to BlobVerifier to verify the kzg commitments of all blob sidecars for a block together, then using the cached +// result of the batch verification when verifying the individual blobs. +type BlobBatchVerifier struct { + verifyKzg kzgVerifier + newVerifier verification.NewBlobVerifier +} + +var _ das.BlobBatchVerifier = &BlobBatchVerifier{} + +func (batch *BlobBatchVerifier) VerifiedROBlobs(ctx context.Context, blk blocks.ROBlock, scs []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) { + if len(scs) == 0 { + return nil, nil + } + // We assume the proposer was validated wrt the block in batch block processing before performing the DA check. + + // So at this stage we just need to make sure the value being signed and signature bytes match the block. + for i := range scs { + if blk.Signature() != bytesutil.ToBytes96(scs[i].SignedBlockHeader.Signature) { + return nil, ErrBatchSignatureMismatch + } + // Extra defensive check to make sure the roots match. This should be unnecessary in practice since the root from + // the block should be used as the lookup key into the cache of sidecars. + if blk.Root() != scs[i].BlockRoot() { + return nil, ErrBatchBlockRootMismatch + } + } + // Verify commitments for all blobs at once. verifyOneBlob assumes it is only called once this check succeeds. + if err := batch.verifyKzg(scs...); err != nil { + return nil, err + } + vs := make([]blocks.VerifiedROBlob, len(scs)) + for i := range scs { + vb, err := batch.verifyOneBlob(ctx, scs[i]) + if err != nil { + return nil, err + } + vs[i] = vb + } + return vs, nil +} + +func (batch *BlobBatchVerifier) verifyOneBlob(ctx context.Context, sc blocks.ROBlob) (blocks.VerifiedROBlob, error) { + vb := blocks.VerifiedROBlob{} + bv := batch.newVerifier(sc, verification.InitsyncSidecarRequirements) + // We can satisfy the following 2 requirements immediately because VerifiedROBlobs always verifies commitments + // and block signature for all blobs in the batch before calling verifyOneBlob. + bv.SatisfyRequirement(verification.RequireSidecarKzgProofVerified) + bv.SatisfyRequirement(verification.RequireValidProposerSignature) + + if err := bv.BlobIndexInBounds(); err != nil { + return vb, err + } + if err := bv.SidecarInclusionProven(); err != nil { + return vb, err + } + + return bv.VerifiedROBlob() +} diff --git a/beacon-chain/sync/mock_blob_verifier.go b/beacon-chain/sync/mock_blob_verifier.go deleted file mode 100644 index 70b14bff2f28..000000000000 --- a/beacon-chain/sync/mock_blob_verifier.go +++ /dev/null @@ -1,84 +0,0 @@ -package sync - -import ( - "context" - - "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" -) - -type BlobVerifier interface { - VerifiedROBlob() (blocks.VerifiedROBlob, error) - BlobIndexInBounds() (err error) - SlotNotTooEarly() (err error) - SlotAboveFinalized() (err error) - ValidProposerSignature(ctx context.Context) (err error) - SidecarParentSeen(badParent func([32]byte) bool) (err error) - SidecarParentValid(badParent func([32]byte) bool) (err error) - SidecarParentSlotLower() (err error) - SidecarDescendsFromFinalized() (err error) - SidecarInclusionProven() (err error) - SidecarKzgProofVerified() (err error) - SidecarProposerExpected(ctx context.Context) (err error) -} - -type mockBlobVerifier struct { - errBlobIndexInBounds error - errSlotTooEarly error - errSlotAboveFinalized error - errValidProposerSignature error - errSidecarParentSeen error - errSidecarParentValid error - errSidecarParentSlotLower error - errSidecarDescendsFromFinalized error - errSidecarInclusionProven error - errSidecarKzgProofVerified error - errSidecarProposerExpected error -} - -func (m *mockBlobVerifier) VerifiedROBlob() (blocks.VerifiedROBlob, error) { - return blocks.VerifiedROBlob{}, nil -} - -func (m *mockBlobVerifier) BlobIndexInBounds() (err error) { - return m.errBlobIndexInBounds -} - -func (m *mockBlobVerifier) SlotNotTooEarly() (err error) { - return m.errSlotTooEarly -} - -func (m *mockBlobVerifier) SlotAboveFinalized() (err error) { - return m.errSlotAboveFinalized -} - -func (m *mockBlobVerifier) ValidProposerSignature(ctx context.Context) (err error) { - return m.errValidProposerSignature -} - -func (m *mockBlobVerifier) SidecarParentSeen(badParent func([32]byte) bool) (err error) { - return m.errSidecarParentSeen -} - -func (m *mockBlobVerifier) SidecarParentValid(badParent func([32]byte) bool) (err error) { - return m.errSidecarParentValid -} - -func (m *mockBlobVerifier) SidecarParentSlotLower() (err error) { - return m.errSidecarParentSlotLower -} - -func (m *mockBlobVerifier) SidecarDescendsFromFinalized() (err error) { - return m.errSidecarDescendsFromFinalized -} - -func (m *mockBlobVerifier) SidecarInclusionProven() (err error) { - return m.errSidecarInclusionProven -} - -func (m *mockBlobVerifier) SidecarKzgProofVerified() (err error) { - return m.errSidecarKzgProofVerified -} - -func (m *mockBlobVerifier) SidecarProposerExpected(ctx context.Context) (err error) { - return m.errSidecarProposerExpected -} diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 18fe859ecfcb..428adf09a1e8 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -211,7 +211,7 @@ func (s *Service) processAndBroadcastBlock(ctx context.Context, b interfaces.Rea } } - if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot); err != nil { + if err := s.cfg.chain.ReceiveBlock(ctx, b, blkRoot, nil); err != nil { return err } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 8b71fbe16cf3..bc664489814e 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -154,7 +154,7 @@ type Service struct { clockWaiter startup.ClockWaiter initialSyncComplete chan struct{} verifierWaiter *verification.InitializerWaiter - newBlobVerifier NewBlobVerifier + newBlobVerifier verification.NewBlobVerifier } // NewService initializes new regular sync service. @@ -205,11 +205,9 @@ func NewService(ctx context.Context, opts ...Option) *Service { return r } -type NewBlobVerifier func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier - -func newBlobVerifierFromInitializer(ini *verification.Initializer) NewBlobVerifier { - return func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return ini.NewBlobVerifier(b, reqs...) +func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return ini.NewBlobVerifier(b, reqs) } } diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index 5eb5931a2be6..95c7c23e4332 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -27,7 +27,7 @@ func (s *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) return err } - if err := s.cfg.chain.ReceiveBlock(ctx, signed, root); err != nil { + if err := s.cfg.chain.ReceiveBlock(ctx, signed, root, nil); err != nil { if blockchain.IsInvalidBlock(err) { r := blockchain.InvalidBlockRoot(err) if r != [32]byte{} { diff --git a/beacon-chain/sync/validate_blob.go b/beacon-chain/sync/validate_blob.go index 22b547596263..bfa2b5c8a579 100644 --- a/beacon-chain/sync/validate_blob.go +++ b/beacon-chain/sync/validate_blob.go @@ -47,7 +47,7 @@ func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Mes if err != nil { return pubsub.ValidationReject, errors.Wrap(err, "roblob conversion failure") } - vf := s.newBlobVerifier(blob, verification.GossipSidecarRequirements...) + vf := s.newBlobVerifier(blob, verification.GossipSidecarRequirements) if err := vf.BlobIndexInBounds(); err != nil { return pubsub.ValidationReject, err @@ -60,7 +60,7 @@ func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Mes return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic) } - if err := vf.SlotNotTooEarly(); err != nil { + if err := vf.NotFromFutureSlot(); err != nil { return pubsub.ValidationIgnore, err } diff --git a/beacon-chain/sync/validate_blob_test.go b/beacon-chain/sync/validate_blob_test.go index a9d542b06ec6..83b3322ac0eb 100644 --- a/beacon-chain/sync/validate_blob_test.go +++ b/beacon-chain/sync/validate_blob_test.go @@ -172,83 +172,83 @@ func TestValidateBlob_ErrorPathsWithMock(t *testing.T) { tests := []struct { name string error error - verifier NewBlobVerifier + verifier verification.NewBlobVerifier result pubsub.ValidationResult }{ { error: errors.New("blob index out of bound"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errBlobIndexInBounds: errors.New("blob index out of bound")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrBlobIndexInBounds: errors.New("blob index out of bound")} }, result: pubsub.ValidationReject, }, { error: errors.New("slot too early"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSlotTooEarly: errors.New("slot too early")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSlotTooEarly: errors.New("slot too early")} }, result: pubsub.ValidationIgnore, }, { error: errors.New("slot above finalized"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSlotAboveFinalized: errors.New("slot above finalized")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSlotAboveFinalized: errors.New("slot above finalized")} }, result: pubsub.ValidationIgnore, }, { error: errors.New("valid proposer signature"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errValidProposerSignature: errors.New("valid proposer signature")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrValidProposerSignature: errors.New("valid proposer signature")} }, result: pubsub.ValidationReject, }, { error: errors.New("sidecar parent seen"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSidecarParentSeen: errors.New("sidecar parent seen")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSidecarParentSeen: errors.New("sidecar parent seen")} }, result: pubsub.ValidationIgnore, }, { error: errors.New("sidecar parent valid"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSidecarParentValid: errors.New("sidecar parent valid")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSidecarParentValid: errors.New("sidecar parent valid")} }, result: pubsub.ValidationReject, }, { error: errors.New("sidecar parent slot lower"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSidecarParentSlotLower: errors.New("sidecar parent slot lower")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSidecarParentSlotLower: errors.New("sidecar parent slot lower")} }, result: pubsub.ValidationReject, }, { error: errors.New("descends from finalized"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSidecarDescendsFromFinalized: errors.New("descends from finalized")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSidecarDescendsFromFinalized: errors.New("descends from finalized")} }, result: pubsub.ValidationReject, }, { error: errors.New("inclusion proven"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSidecarInclusionProven: errors.New("inclusion proven")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSidecarInclusionProven: errors.New("inclusion proven")} }, result: pubsub.ValidationReject, }, { error: errors.New("kzg proof verified"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSidecarKzgProofVerified: errors.New("kzg proof verified")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSidecarKzgProofVerified: errors.New("kzg proof verified")} }, result: pubsub.ValidationReject, }, { error: errors.New("sidecar proposer expected"), - verifier: func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{errSidecarProposerExpected: errors.New("sidecar proposer expected")} + verifier: func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{ErrSidecarProposerExpected: errors.New("sidecar proposer expected")} }, result: pubsub.ValidationReject, }, @@ -285,8 +285,8 @@ func TestValidateBlob_ErrorPathsWithMock(t *testing.T) { } } -func testNewBlobVerifier() NewBlobVerifier { - return func(b blocks.ROBlob, reqs ...verification.Requirement) BlobVerifier { - return &mockBlobVerifier{} +func testNewBlobVerifier() verification.NewBlobVerifier { + return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { + return &verification.MockBlobVerifier{} } } diff --git a/beacon-chain/verification/BUILD.bazel b/beacon-chain/verification/BUILD.bazel index 65247a50ce36..0a77b7868b3d 100644 --- a/beacon-chain/verification/BUILD.bazel +++ b/beacon-chain/verification/BUILD.bazel @@ -8,6 +8,8 @@ go_library( "error.go", "fake.go", "initializer.go", + "interface.go", + "mock.go", "result.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/verification", diff --git a/beacon-chain/verification/blob.go b/beacon-chain/verification/blob.go index bb91edc79de7..cc8e5f6cb071 100644 --- a/beacon-chain/verification/blob.go +++ b/beacon-chain/verification/blob.go @@ -16,7 +16,7 @@ import ( const ( RequireBlobIndexInBounds Requirement = iota - RequireSlotNotTooEarly + RequireNotFromFutureSlot RequireSlotAboveFinalized RequireValidProposerSignature RequireSidecarParentSeen @@ -32,7 +32,7 @@ const ( // must satisfy in order to upgrade an ROBlob to a VerifiedROBlob. var GossipSidecarRequirements = []Requirement{ RequireBlobIndexInBounds, - RequireSlotNotTooEarly, + RequireNotFromFutureSlot, RequireSlotAboveFinalized, RequireValidProposerSignature, RequireSidecarParentSeen, @@ -44,12 +44,30 @@ var GossipSidecarRequirements = []Requirement{ RequireSidecarProposerExpected, } +// InitsyncSidecarRequirements is the list of verification requirements to be used by the init-sync service +// for batch-mode syncing. Because we only perform batch verification as part of the IsDataAvailable method +// for blobs after the block has been verified, and the blobs to be verified are keyed in the cache by the +// block root, it is safe to skip the following verifications. +// RequireSidecarProposerExpected +// RequireNotFromFutureSlot, +// RequireSlotAboveFinalized, +// RequireSidecarParentSeen, +// RequireSidecarParentValid, +// RequireSidecarParentSlotLower, +// RequireSidecarDescendsFromFinalized, +var InitsyncSidecarRequirements = []Requirement{ + RequireValidProposerSignature, + RequireSidecarKzgProofVerified, + RequireBlobIndexInBounds, + RequireSidecarInclusionProven, +} + var ( ErrBlobInvalid = errors.New("blob failed verification") // ErrBlobIndexInvalid means RequireBlobIndexInBounds failed. ErrBlobIndexInvalid = errors.Wrap(ErrBlobInvalid, "incorrect blob sidecar index") - // ErrSlotTooEarly means RequireSlotNotTooEarly failed. - ErrSlotTooEarly = errors.Wrap(ErrBlobInvalid, "slot is too far in the future") + // ErrFromFutureSlot means RequireSlotNotTooEarly failed. + ErrFromFutureSlot = errors.Wrap(ErrBlobInvalid, "slot is too far in the future") // ErrSlotNotAfterFinalized means RequireSlotAboveFinalized failed. ErrSlotNotAfterFinalized = errors.Wrap(ErrBlobInvalid, "slot <= finalized checkpoint") // ErrInvalidProposerSignature means RequireValidProposerSignature failed. @@ -70,7 +88,7 @@ var ( ErrSidecarUnexpectedProposer = errors.Wrap(ErrBlobInvalid, "sidecar was not proposed by the expected proposer_index") ) -type BlobVerifier struct { +type ROBlobVerifier struct { *sharedResources results *results blob blocks.ROBlob @@ -78,19 +96,31 @@ type BlobVerifier struct { verifyBlobCommitment roblobCommitmentVerifier } -type roblobCommitmentVerifier func(blocks.ROBlob) error +type roblobCommitmentVerifier func(...blocks.ROBlob) error + +var _ BlobVerifier = &ROBlobVerifier{} // VerifiedROBlob "upgrades" the wrapped ROBlob to a VerifiedROBlob. // If any of the verifications ran against the blob failed, or some required verifications // were not run, an error will be returned. -func (bv *BlobVerifier) VerifiedROBlob() (blocks.VerifiedROBlob, error) { +func (bv *ROBlobVerifier) VerifiedROBlob() (blocks.VerifiedROBlob, error) { if bv.results.allSatisfied() { return blocks.NewVerifiedROBlob(bv.blob), nil } return blocks.VerifiedROBlob{}, bv.results.errors(ErrBlobInvalid) } -func (bv *BlobVerifier) recordResult(req Requirement, err *error) { +// SatisfyRequirement allows the caller to assert that a requirement has been satisfied. +// This gives us a way to tick the box for a requirement where the usual method would be impractical. +// For example, when batch syncing, forkchoice is only updated at the end of the batch. So the checks that use +// forkchoice, like descends from finalized or parent seen, would necessarily fail. Allowing the caller to +// assert the requirement has been satisfied ensures we have an easy way to audit which piece of code is satisfying +// a requireent outside of this package. +func (bv *ROBlobVerifier) SatisfyRequirement(req Requirement) { + bv.recordResult(req, nil) +} + +func (bv *ROBlobVerifier) recordResult(req Requirement, err *error) { if err == nil || *err == nil { bv.results.record(req, nil) return @@ -100,7 +130,7 @@ func (bv *BlobVerifier) recordResult(req Requirement, err *error) { // BlobIndexInBounds represents the follow spec verification: // [REJECT] The sidecar's index is consistent with MAX_BLOBS_PER_BLOCK -- i.e. blob_sidecar.index < MAX_BLOBS_PER_BLOCK. -func (bv *BlobVerifier) BlobIndexInBounds() (err error) { +func (bv *ROBlobVerifier) BlobIndexInBounds() (err error) { defer bv.recordResult(RequireBlobIndexInBounds, &err) if bv.blob.Index >= fieldparams.MaxBlobsPerBlock { log.WithFields(logging.BlobFields(bv.blob)).Debug("Sidecar index >= MAX_BLOBS_PER_BLOCK") @@ -109,19 +139,20 @@ func (bv *BlobVerifier) BlobIndexInBounds() (err error) { return nil } -// SlotNotTooEarly represents the spec verification: +// NotFromFutureSlot represents the spec verification: // [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) // -- i.e. validate that block_header.slot <= current_slot -func (bv *BlobVerifier) SlotNotTooEarly() (err error) { - defer bv.recordResult(RequireSlotNotTooEarly, &err) +func (bv *ROBlobVerifier) NotFromFutureSlot() (err error) { + defer bv.recordResult(RequireNotFromFutureSlot, &err) if bv.clock.CurrentSlot() == bv.blob.Slot() { return nil } - // Subtract the max clock disparity from the start slot time. - validAfter := bv.clock.SlotStart(bv.blob.Slot()).Add(-1 * params.BeaconConfig().MaximumGossipClockDisparityDuration()) - // If the difference between now and gt is greater than maximum clock disparity, the block is too far in the future. - if bv.clock.Now().Before(validAfter) { - return ErrSlotTooEarly + // earliestStart represents the time the slot starts, lowered by MAXIMUM_GOSSIP_CLOCK_DISPARITY. + // We lower the time by MAXIMUM_GOSSIP_CLOCK_DISPARITY in case system time is running slightly behind real time. + earliestStart := bv.clock.SlotStart(bv.blob.Slot()).Add(-1 * params.BeaconConfig().MaximumGossipClockDisparityDuration()) + // If the system time is still before earliestStart, we consider the blob from a future slot and return an error. + if bv.clock.Now().Before(earliestStart) { + return ErrFromFutureSlot } return nil } @@ -129,7 +160,7 @@ func (bv *BlobVerifier) SlotNotTooEarly() (err error) { // SlotAboveFinalized represents the spec verification: // [IGNORE] The sidecar is from a slot greater than the latest finalized slot // -- i.e. validate that block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch) -func (bv *BlobVerifier) SlotAboveFinalized() (err error) { +func (bv *ROBlobVerifier) SlotAboveFinalized() (err error) { defer bv.recordResult(RequireSlotAboveFinalized, &err) fcp := bv.fc.FinalizedCheckpoint() fSlot, err := slots.EpochStart(fcp.Epoch) @@ -145,7 +176,7 @@ func (bv *BlobVerifier) SlotAboveFinalized() (err error) { // ValidProposerSignature represents the spec verification: // [REJECT] The proposer signature of blob_sidecar.signed_block_header, // is valid with respect to the block_header.proposer_index pubkey. -func (bv *BlobVerifier) ValidProposerSignature(ctx context.Context) (err error) { +func (bv *ROBlobVerifier) ValidProposerSignature(ctx context.Context) (err error) { defer bv.recordResult(RequireValidProposerSignature, &err) sd := blobToSignatureData(bv.blob) // First check if there is a cached verification that can be reused. @@ -175,12 +206,12 @@ func (bv *BlobVerifier) ValidProposerSignature(ctx context.Context) (err error) // SidecarParentSeen represents the spec verification: // [IGNORE] The sidecar's block's parent (defined by block_header.parent_root) has been seen // (via both gossip and non-gossip sources) (a client MAY queue sidecars for processing once the parent block is retrieved). -func (bv *BlobVerifier) SidecarParentSeen(badParent func([32]byte) bool) (err error) { +func (bv *ROBlobVerifier) SidecarParentSeen(parentSeen func([32]byte) bool) (err error) { defer bv.recordResult(RequireSidecarParentSeen, &err) - if bv.fc.HasNode(bv.blob.ParentRoot()) { + if parentSeen != nil && parentSeen(bv.blob.ParentRoot()) { return nil } - if badParent != nil && badParent(bv.blob.ParentRoot()) { + if bv.fc.HasNode(bv.blob.ParentRoot()) { return nil } return ErrSidecarParentNotSeen @@ -188,7 +219,7 @@ func (bv *BlobVerifier) SidecarParentSeen(badParent func([32]byte) bool) (err er // SidecarParentValid represents the spec verification: // [REJECT] The sidecar's block's parent (defined by block_header.parent_root) passes validation. -func (bv *BlobVerifier) SidecarParentValid(badParent func([32]byte) bool) (err error) { +func (bv *ROBlobVerifier) SidecarParentValid(badParent func([32]byte) bool) (err error) { defer bv.recordResult(RequireSidecarParentValid, &err) if badParent != nil && badParent(bv.blob.ParentRoot()) { return ErrSidecarParentInvalid @@ -198,7 +229,7 @@ func (bv *BlobVerifier) SidecarParentValid(badParent func([32]byte) bool) (err e // SidecarParentSlotLower represents the spec verification: // [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by block_header.parent_root). -func (bv *BlobVerifier) SidecarParentSlotLower() (err error) { +func (bv *ROBlobVerifier) SidecarParentSlotLower() (err error) { defer bv.recordResult(RequireSidecarParentSlotLower, &err) parentSlot, err := bv.fc.Slot(bv.blob.ParentRoot()) if err != nil { @@ -213,7 +244,7 @@ func (bv *BlobVerifier) SidecarParentSlotLower() (err error) { // SidecarDescendsFromFinalized represents the spec verification: // [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block // -- i.e. get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root. -func (bv *BlobVerifier) SidecarDescendsFromFinalized() (err error) { +func (bv *ROBlobVerifier) SidecarDescendsFromFinalized() (err error) { defer bv.recordResult(RequireSidecarDescendsFromFinalized, &err) if !bv.fc.IsCanonical(bv.blob.ParentRoot()) { return ErrSidecarNotFinalizedDescendent @@ -223,7 +254,7 @@ func (bv *BlobVerifier) SidecarDescendsFromFinalized() (err error) { // SidecarInclusionProven represents the spec verification: // [REJECT] The sidecar's inclusion proof is valid as verified by verify_blob_sidecar_inclusion_proof(blob_sidecar). -func (bv *BlobVerifier) SidecarInclusionProven() (err error) { +func (bv *ROBlobVerifier) SidecarInclusionProven() (err error) { defer bv.recordResult(RequireSidecarInclusionProven, &err) if err = blocks.VerifyKZGInclusionProof(bv.blob); err != nil { log.WithError(err).WithFields(logging.BlobFields(bv.blob)).Debug("sidecar inclusion proof verification failed") @@ -235,7 +266,7 @@ func (bv *BlobVerifier) SidecarInclusionProven() (err error) { // SidecarKzgProofVerified represents the spec verification: // [REJECT] The sidecar's blob is valid as verified by // verify_blob_kzg_proof(blob_sidecar.blob, blob_sidecar.kzg_commitment, blob_sidecar.kzg_proof). -func (bv *BlobVerifier) SidecarKzgProofVerified() (err error) { +func (bv *ROBlobVerifier) SidecarKzgProofVerified() (err error) { defer bv.recordResult(RequireSidecarKzgProofVerified, &err) if err = bv.verifyBlobCommitment(bv.blob); err != nil { log.WithError(err).WithFields(logging.BlobFields(bv.blob)).Debug("kzg commitment proof verification failed") @@ -249,7 +280,7 @@ func (bv *BlobVerifier) SidecarKzgProofVerified() (err error) { // in the context of the current shuffling (defined by block_header.parent_root/block_header.slot). // If the proposer_index cannot immediately be verified against the expected shuffling, the sidecar MAY be queued // for later processing while proposers for the block's branch are calculated -- in such a case do not REJECT, instead IGNORE this message. -func (bv *BlobVerifier) SidecarProposerExpected(ctx context.Context) (err error) { +func (bv *ROBlobVerifier) SidecarProposerExpected(ctx context.Context) (err error) { defer bv.recordResult(RequireSidecarProposerExpected, &err) idx, cached := bv.pc.Proposer(bv.blob.ParentRoot(), bv.blob.Slot()) if !cached { @@ -273,7 +304,7 @@ func (bv *BlobVerifier) SidecarProposerExpected(ctx context.Context) (err error) return nil } -func (bv *BlobVerifier) parentState(ctx context.Context) (state.BeaconState, error) { +func (bv *ROBlobVerifier) parentState(ctx context.Context) (state.BeaconState, error) { if bv.parent != nil { return bv.parent, nil } diff --git a/beacon-chain/verification/blob_test.go b/beacon-chain/verification/blob_test.go index 1c69ec5a9589..d35b2aa31401 100644 --- a/beacon-chain/verification/blob_test.go +++ b/beacon-chain/verification/blob_test.go @@ -27,13 +27,13 @@ func TestBlobIndexInBounds(t *testing.T) { _, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 1) b := blobs[0] // set Index to a value that is out of bounds - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.BlobIndexInBounds()) require.Equal(t, true, v.results.executed(RequireBlobIndexInBounds)) require.NoError(t, v.results.result(RequireBlobIndexInBounds)) b.Index = fieldparams.MaxBlobsPerBlock - v = ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v = ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.BlobIndexInBounds(), ErrBlobIndexInvalid) require.Equal(t, true, v.results.executed(RequireBlobIndexInBounds)) require.NotNil(t, v.results.result(RequireBlobIndexInBounds)) @@ -52,27 +52,27 @@ func TestSlotNotTooEarly(t *testing.T) { // This clock will give a current slot of 1 on the nose happyClock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(func() time.Time { return now })) ini := Initializer{shared: &sharedResources{clock: happyClock}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) - require.NoError(t, v.SlotNotTooEarly()) - require.Equal(t, true, v.results.executed(RequireSlotNotTooEarly)) - require.NoError(t, v.results.result(RequireSlotNotTooEarly)) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) + require.NoError(t, v.NotFromFutureSlot()) + require.Equal(t, true, v.results.executed(RequireNotFromFutureSlot)) + require.NoError(t, v.results.result(RequireNotFromFutureSlot)) // Since we have an early return for slots that are directly equal, give a time that is less than max disparity // but still in the previous slot. closeClock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(func() time.Time { return now.Add(-1 * params.BeaconConfig().MaximumGossipClockDisparityDuration() / 2) })) ini = Initializer{shared: &sharedResources{clock: closeClock}} - v = ini.NewBlobVerifier(b, GossipSidecarRequirements...) - require.NoError(t, v.SlotNotTooEarly()) + v = ini.NewBlobVerifier(b, GossipSidecarRequirements) + require.NoError(t, v.NotFromFutureSlot()) // This clock will give a current slot of 0, with now coming more than max clock disparity before slot 1 disparate := now.Add(-2 * params.BeaconConfig().MaximumGossipClockDisparityDuration()) dispClock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(func() time.Time { return disparate })) // Set up initializer to use the clock that will set now to a little to far before slot 1 ini = Initializer{shared: &sharedResources{clock: dispClock}} - v = ini.NewBlobVerifier(b, GossipSidecarRequirements...) - require.ErrorIs(t, v.SlotNotTooEarly(), ErrSlotTooEarly) - require.Equal(t, true, v.results.executed(RequireSlotNotTooEarly)) - require.NotNil(t, v.results.result(RequireSlotNotTooEarly)) + v = ini.NewBlobVerifier(b, GossipSidecarRequirements) + require.ErrorIs(t, v.NotFromFutureSlot(), ErrFromFutureSlot) + require.Equal(t, true, v.results.executed(RequireNotFromFutureSlot)) + require.NotNil(t, v.results.result(RequireNotFromFutureSlot)) } func TestSlotAboveFinalized(t *testing.T) { @@ -114,7 +114,7 @@ func TestSlotAboveFinalized(t *testing.T) { _, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 1) b := blobs[0] b.SignedBlockHeader.Header.Slot = c.slot - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) err := v.SlotAboveFinalized() require.Equal(t, true, v.results.executed(RequireSlotAboveFinalized)) if c.err == nil { @@ -146,7 +146,7 @@ func TestValidProposerSignature_Cached(t *testing.T) { }, } ini := Initializer{shared: &sharedResources{sc: sc, sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.ValidProposerSignature(ctx)) require.Equal(t, true, v.results.executed(RequireValidProposerSignature)) require.NoError(t, v.results.result(RequireValidProposerSignature)) @@ -159,7 +159,7 @@ func TestValidProposerSignature_Cached(t *testing.T) { return true, errors.New("derp") } ini = Initializer{shared: &sharedResources{sc: sc, sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}}} - v = ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v = ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.ValidProposerSignature(ctx), ErrInvalidProposerSignature) require.Equal(t, true, v.results.executed(RequireValidProposerSignature)) require.NotNil(t, v.results.result(RequireValidProposerSignature)) @@ -182,14 +182,14 @@ func TestValidProposerSignature_CacheMiss(t *testing.T) { }, } ini := Initializer{shared: &sharedResources{sc: sc, sr: sbrForValOverride(b.ProposerIndex(), ðpb.Validator{})}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.ValidProposerSignature(ctx)) require.Equal(t, true, v.results.executed(RequireValidProposerSignature)) require.NoError(t, v.results.result(RequireValidProposerSignature)) // simulate state not found ini = Initializer{shared: &sharedResources{sc: sc, sr: sbrNotFound(t, expectedSd.Parent)}} - v = ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v = ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.ValidProposerSignature(ctx), ErrInvalidProposerSignature) require.Equal(t, true, v.results.executed(RequireValidProposerSignature)) require.NotNil(t, v.results.result(RequireValidProposerSignature)) @@ -206,7 +206,7 @@ func TestValidProposerSignature_CacheMiss(t *testing.T) { }, } ini = Initializer{shared: &sharedResources{sc: sc, sr: sbr}} - v = ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v = ini.NewBlobVerifier(b, GossipSidecarRequirements) // make sure all the histories are clean before calling the method // so we don't get polluted by previous usages @@ -255,14 +255,14 @@ func TestSidecarParentSeen(t *testing.T) { t.Run("happy path", func(t *testing.T) { ini := Initializer{shared: &sharedResources{fc: fcHas}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.SidecarParentSeen(nil)) require.Equal(t, true, v.results.executed(RequireSidecarParentSeen)) require.NoError(t, v.results.result(RequireSidecarParentSeen)) }) t.Run("HasNode false, no badParent cb, expected error", func(t *testing.T) { ini := Initializer{shared: &sharedResources{fc: fcLacks}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarParentSeen(nil), ErrSidecarParentNotSeen) require.Equal(t, true, v.results.executed(RequireSidecarParentSeen)) require.NotNil(t, v.results.result(RequireSidecarParentSeen)) @@ -270,14 +270,14 @@ func TestSidecarParentSeen(t *testing.T) { t.Run("HasNode false, badParent true", func(t *testing.T) { ini := Initializer{shared: &sharedResources{fc: fcLacks}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.SidecarParentSeen(badParentCb(t, b.ParentRoot(), true))) require.Equal(t, true, v.results.executed(RequireSidecarParentSeen)) require.NoError(t, v.results.result(RequireSidecarParentSeen)) }) t.Run("HasNode false, badParent false", func(t *testing.T) { ini := Initializer{shared: &sharedResources{fc: fcLacks}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarParentSeen(badParentCb(t, b.ParentRoot(), false)), ErrSidecarParentNotSeen) require.Equal(t, true, v.results.executed(RequireSidecarParentSeen)) require.NotNil(t, v.results.result(RequireSidecarParentSeen)) @@ -289,14 +289,14 @@ func TestSidecarParentValid(t *testing.T) { b := blobs[0] t.Run("parent valid", func(t *testing.T) { ini := Initializer{shared: &sharedResources{}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.SidecarParentValid(badParentCb(t, b.ParentRoot(), false))) require.Equal(t, true, v.results.executed(RequireSidecarParentValid)) require.NoError(t, v.results.result(RequireSidecarParentValid)) }) t.Run("parent not valid", func(t *testing.T) { ini := Initializer{shared: &sharedResources{}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarParentValid(badParentCb(t, b.ParentRoot(), true)), ErrSidecarParentInvalid) require.Equal(t, true, v.results.executed(RequireSidecarParentValid)) require.NotNil(t, v.results.result(RequireSidecarParentValid)) @@ -340,7 +340,7 @@ func TestSidecarParentSlotLower(t *testing.T) { } return c.fcSlot, c.fcErr }}}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) err := v.SidecarParentSlotLower() require.Equal(t, true, v.results.executed(RequireSidecarParentSlotLower)) if c.err == nil { @@ -364,7 +364,7 @@ func TestSidecarDescendsFromFinalized(t *testing.T) { } return false }}}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarDescendsFromFinalized(), ErrSidecarNotFinalizedDescendent) require.Equal(t, true, v.results.executed(RequireSidecarDescendsFromFinalized)) require.NotNil(t, v.results.result(RequireSidecarDescendsFromFinalized)) @@ -376,7 +376,7 @@ func TestSidecarDescendsFromFinalized(t *testing.T) { } return true }}}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.SidecarDescendsFromFinalized()) require.Equal(t, true, v.results.executed(RequireSidecarDescendsFromFinalized)) require.NoError(t, v.results.result(RequireSidecarDescendsFromFinalized)) @@ -389,7 +389,7 @@ func TestSidecarInclusionProven(t *testing.T) { b := blobs[0] ini := Initializer{} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.SidecarInclusionProven()) require.Equal(t, true, v.results.executed(RequireSidecarInclusionProven)) require.NoError(t, v.results.result(RequireSidecarInclusionProven)) @@ -397,7 +397,7 @@ func TestSidecarInclusionProven(t *testing.T) { // Invert bits of the first byte of the body root to mess up the proof byte0 := b.SignedBlockHeader.Header.BodyRoot[0] b.SignedBlockHeader.Header.BodyRoot[0] = byte0 ^ 255 - v = ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v = ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarInclusionProven(), ErrSidecarInclusionProofInvalid) require.Equal(t, true, v.results.executed(RequireSidecarInclusionProven)) require.NotNil(t, v.results.result(RequireSidecarInclusionProven)) @@ -407,20 +407,20 @@ func TestSidecarKzgProofVerified(t *testing.T) { // GenerateTestDenebBlockWithSidecar is supposed to generate valid commitments _, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 1) b := blobs[0] - passes := func(vb blocks.ROBlob) error { - require.Equal(t, true, bytes.Equal(b.KzgCommitment, vb.KzgCommitment)) + passes := func(vb ...blocks.ROBlob) error { + require.Equal(t, true, bytes.Equal(b.KzgCommitment, vb[0].KzgCommitment)) return nil } - v := &BlobVerifier{verifyBlobCommitment: passes, results: newResults(), blob: b} + v := &ROBlobVerifier{verifyBlobCommitment: passes, results: newResults(), blob: b} require.NoError(t, v.SidecarKzgProofVerified()) require.Equal(t, true, v.results.executed(RequireSidecarKzgProofVerified)) require.NoError(t, v.results.result(RequireSidecarKzgProofVerified)) - fails := func(vb blocks.ROBlob) error { - require.Equal(t, true, bytes.Equal(b.KzgCommitment, vb.KzgCommitment)) + fails := func(vb ...blocks.ROBlob) error { + require.Equal(t, true, bytes.Equal(b.KzgCommitment, vb[0].KzgCommitment)) return errors.New("bad blob") } - v = &BlobVerifier{verifyBlobCommitment: fails, results: newResults(), blob: b} + v = &ROBlobVerifier{results: newResults(), blob: b, verifyBlobCommitment: fails} require.ErrorIs(t, v.SidecarKzgProofVerified(), ErrSidecarKzgProofInvalid) require.Equal(t, true, v.results.executed(RequireSidecarKzgProofVerified)) require.NotNil(t, v.results.result(RequireSidecarKzgProofVerified)) @@ -432,21 +432,21 @@ func TestSidecarProposerExpected(t *testing.T) { b := blobs[0] t.Run("cached, matches", func(t *testing.T) { ini := Initializer{shared: &sharedResources{pc: &mockProposerCache{ProposerCB: pcReturnsIdx(b.ProposerIndex())}}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.SidecarProposerExpected(ctx)) require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected)) require.NoError(t, v.results.result(RequireSidecarProposerExpected)) }) t.Run("cached, does not match", func(t *testing.T) { ini := Initializer{shared: &sharedResources{pc: &mockProposerCache{ProposerCB: pcReturnsIdx(b.ProposerIndex() + 1)}}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer) require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected)) require.NotNil(t, v.results.result(RequireSidecarProposerExpected)) }) t.Run("not cached, state lookup failure", func(t *testing.T) { ini := Initializer{shared: &sharedResources{sr: sbrNotFound(t, b.ParentRoot()), pc: &mockProposerCache{ProposerCB: pcReturnsNotFound()}}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer) require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected)) require.NotNil(t, v.results.result(RequireSidecarProposerExpected)) @@ -462,7 +462,7 @@ func TestSidecarProposerExpected(t *testing.T) { }, } ini := Initializer{shared: &sharedResources{sr: sbrForValOverride(b.ProposerIndex(), ðpb.Validator{}), pc: pc}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.NoError(t, v.SidecarProposerExpected(ctx)) require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected)) require.NoError(t, v.results.result(RequireSidecarProposerExpected)) @@ -477,7 +477,7 @@ func TestSidecarProposerExpected(t *testing.T) { }, } ini := Initializer{shared: &sharedResources{sr: sbrForValOverride(b.ProposerIndex(), ðpb.Validator{}), pc: pc}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer) require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected)) require.NotNil(t, v.results.result(RequireSidecarProposerExpected)) @@ -492,7 +492,7 @@ func TestSidecarProposerExpected(t *testing.T) { }, } ini := Initializer{shared: &sharedResources{sr: sbrForValOverride(b.ProposerIndex(), ðpb.Validator{}), pc: pc}} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer) require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected)) require.NotNil(t, v.results.result(RequireSidecarProposerExpected)) @@ -503,7 +503,7 @@ func TestRequirementSatisfaction(t *testing.T) { _, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 1) b := blobs[0] ini := Initializer{} - v := ini.NewBlobVerifier(b, GossipSidecarRequirements...) + v := ini.NewBlobVerifier(b, GossipSidecarRequirements) _, err := v.VerifiedROBlob() require.ErrorIs(t, err, ErrBlobInvalid) diff --git a/beacon-chain/verification/fake.go b/beacon-chain/verification/fake.go index 342b0fcfddf2..128bbad0580c 100644 --- a/beacon-chain/verification/fake.go +++ b/beacon-chain/verification/fake.go @@ -1,6 +1,10 @@ package verification -import "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" +import ( + "testing" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" +) // BlobSidecarNoop is a FAKE verification function that simply launders a ROBlob->VerifiedROBlob. // TODO: find all code that uses this method and replace it with full verification. @@ -17,3 +21,24 @@ func BlobSidecarSliceNoop(b []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) { } return vbs, nil } + +// FakeVerifyForTest can be used by tests that need a VerifiedROBlob but don't want to do all the +// expensive set up to perform full validation. +func FakeVerifyForTest(t *testing.T, b blocks.ROBlob) blocks.VerifiedROBlob { + // log so that t is truly required + t.Log("producing fake VerifiedROBlob for a test") + return blocks.NewVerifiedROBlob(b) +} + +// FakeVerifySliceForTest can be used by tests that need a []VerifiedROBlob but don't want to do all the +// expensive set up to perform full validation. +func FakeVerifySliceForTest(t *testing.T, b []blocks.ROBlob) []blocks.VerifiedROBlob { + // log so that t is truly required + t.Log("producing fake []VerifiedROBlob for a test") + // tautological assertion that ensures this function can only be used in tests. + vbs := make([]blocks.VerifiedROBlob, len(b)) + for i := range b { + vbs[i] = blocks.NewVerifiedROBlob(b[i]) + } + return vbs +} diff --git a/beacon-chain/verification/initializer.go b/beacon-chain/verification/initializer.go index caa69a59a234..499bb7d5b79d 100644 --- a/beacon-chain/verification/initializer.go +++ b/beacon-chain/verification/initializer.go @@ -45,12 +45,12 @@ type Initializer struct { } // NewBlobVerifier creates a BlobVerifier for a single blob, with the given set of requirements. -func (ini *Initializer) NewBlobVerifier(b blocks.ROBlob, reqs ...Requirement) *BlobVerifier { - return &BlobVerifier{ +func (ini *Initializer) NewBlobVerifier(b blocks.ROBlob, reqs []Requirement) *ROBlobVerifier { + return &ROBlobVerifier{ sharedResources: ini.shared, blob: b, results: newResults(reqs...), - verifyBlobCommitment: kzg.VerifyROBlobCommitment, + verifyBlobCommitment: kzg.Verify, } } diff --git a/beacon-chain/verification/interface.go b/beacon-chain/verification/interface.go new file mode 100644 index 000000000000..94a15655b86b --- /dev/null +++ b/beacon-chain/verification/interface.go @@ -0,0 +1,31 @@ +package verification + +import ( + "context" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" +) + +// BlobVerifier defines the methods implemented by the ROBlobVerifier. +// It is mainly intended to make mocks and tests more straightforward, and to deal +// with the awkwardness of mocking a concrete type that returns a concrete type +// in tests outside of this package. +type BlobVerifier interface { + VerifiedROBlob() (blocks.VerifiedROBlob, error) + BlobIndexInBounds() (err error) + NotFromFutureSlot() (err error) + SlotAboveFinalized() (err error) + ValidProposerSignature(ctx context.Context) (err error) + SidecarParentSeen(parentSeen func([32]byte) bool) (err error) + SidecarParentValid(badParent func([32]byte) bool) (err error) + SidecarParentSlotLower() (err error) + SidecarDescendsFromFinalized() (err error) + SidecarInclusionProven() (err error) + SidecarKzgProofVerified() (err error) + SidecarProposerExpected(ctx context.Context) (err error) + SatisfyRequirement(Requirement) +} + +// NewBlobVerifier is a function signature that can be used by code that needs to be +// able to mock Initializer.NewBlobVerifier without complex setup. +type NewBlobVerifier func(b blocks.ROBlob, reqs []Requirement) BlobVerifier diff --git a/beacon-chain/verification/mock.go b/beacon-chain/verification/mock.go new file mode 100644 index 000000000000..9cf13c6cee54 --- /dev/null +++ b/beacon-chain/verification/mock.go @@ -0,0 +1,74 @@ +package verification + +import ( + "context" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" +) + +type MockBlobVerifier struct { + ErrBlobIndexInBounds error + ErrSlotTooEarly error + ErrSlotAboveFinalized error + ErrValidProposerSignature error + ErrSidecarParentSeen error + ErrSidecarParentValid error + ErrSidecarParentSlotLower error + ErrSidecarDescendsFromFinalized error + ErrSidecarInclusionProven error + ErrSidecarKzgProofVerified error + ErrSidecarProposerExpected error + cbVerifiedROBlob func() (blocks.VerifiedROBlob, error) +} + +func (m *MockBlobVerifier) VerifiedROBlob() (blocks.VerifiedROBlob, error) { + return m.cbVerifiedROBlob() +} + +func (m *MockBlobVerifier) BlobIndexInBounds() (err error) { + return m.ErrBlobIndexInBounds +} + +func (m *MockBlobVerifier) NotFromFutureSlot() (err error) { + return m.ErrSlotTooEarly +} + +func (m *MockBlobVerifier) SlotAboveFinalized() (err error) { + return m.ErrSlotAboveFinalized +} + +func (m *MockBlobVerifier) ValidProposerSignature(_ context.Context) (err error) { + return m.ErrValidProposerSignature +} + +func (m *MockBlobVerifier) SidecarParentSeen(_ func([32]byte) bool) (err error) { + return m.ErrSidecarParentSeen +} + +func (m *MockBlobVerifier) SidecarParentValid(_ func([32]byte) bool) (err error) { + return m.ErrSidecarParentValid +} + +func (m *MockBlobVerifier) SidecarParentSlotLower() (err error) { + return m.ErrSidecarParentSlotLower +} + +func (m *MockBlobVerifier) SidecarDescendsFromFinalized() (err error) { + return m.ErrSidecarDescendsFromFinalized +} + +func (m *MockBlobVerifier) SidecarInclusionProven() (err error) { + return m.ErrSidecarInclusionProven +} + +func (m *MockBlobVerifier) SidecarKzgProofVerified() (err error) { + return m.ErrSidecarKzgProofVerified +} + +func (m *MockBlobVerifier) SidecarProposerExpected(_ context.Context) (err error) { + return m.ErrSidecarProposerExpected +} + +func (*MockBlobVerifier) SatisfyRequirement(_ Requirement) {} + +var _ BlobVerifier = &MockBlobVerifier{} diff --git a/consensus-types/blocks/roblock.go b/consensus-types/blocks/roblock.go index 9b7c21023970..be26e6c08707 100644 --- a/consensus-types/blocks/roblock.go +++ b/consensus-types/blocks/roblock.go @@ -74,14 +74,18 @@ func (s ROBlockSlice) Len() int { return len(s) } -type BlockWithVerifiedBlobs struct { +// BlockWithROBlobs is a wrapper that collects the block and blob values together. +// This is helpful because these values are collated from separate RPC requests. +type BlockWithROBlobs struct { Block ROBlock Blobs []ROBlob } -type BlockWithVerifiedBlobsSlice []BlockWithVerifiedBlobs +// BlockWithROBlobsSlice gives convnenient access to getting a slice of just the ROBlocks, +// and defines sorting helpers. +type BlockWithROBlobsSlice []BlockWithROBlobs -func (s BlockWithVerifiedBlobsSlice) ROBlocks() []ROBlock { +func (s BlockWithROBlobsSlice) ROBlocks() []ROBlock { r := make([]ROBlock, len(s)) for i := range s { r[i] = s[i].Block @@ -92,7 +96,7 @@ func (s BlockWithVerifiedBlobsSlice) ROBlocks() []ROBlock { // Less reports whether the element with index i must sort before the element with index j. // ROBlocks are ordered first by their slot, // with a lexicographic sort of roots breaking ties for slots with duplicate blocks. -func (s BlockWithVerifiedBlobsSlice) Less(i, j int) bool { +func (s BlockWithROBlobsSlice) Less(i, j int) bool { si, sj := s[i].Block.Block().Slot(), s[j].Block.Block().Slot() // lower slot wins @@ -106,11 +110,11 @@ func (s BlockWithVerifiedBlobsSlice) Less(i, j int) bool { } // Swap swaps the elements with indexes i and j. -func (s BlockWithVerifiedBlobsSlice) Swap(i, j int) { +func (s BlockWithROBlobsSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // Len is the number of elements in the collection. -func (s BlockWithVerifiedBlobsSlice) Len() int { +func (s BlockWithROBlobsSlice) Len() int { return len(s) } diff --git a/runtime/logging/blob.go b/runtime/logging/blob.go index 848bf6146dd2..728c79beb6f2 100644 --- a/runtime/logging/blob.go +++ b/runtime/logging/blob.go @@ -19,3 +19,14 @@ func BlobFields(blob blocks.ROBlob) logrus.Fields { "index": blob.Index, } } + +// BlockFieldsFromBlob extracts the set of fields from a given BlobSidecar which are shared by the block and +// all other sidecars for the block. +func BlockFieldsFromBlob(blob blocks.ROBlob) logrus.Fields { + return logrus.Fields{ + "slot": blob.Slot(), + "proposer_index": blob.ProposerIndex(), + "block_root": fmt.Sprintf("%#x", blob.BlockRoot()), + "parent_root": fmt.Sprintf("%#x", blob.ParentRoot()), + } +} diff --git a/testing/spectest/shared/common/forkchoice/builder.go b/testing/spectest/shared/common/forkchoice/builder.go index f8243707cfdf..848df5b115e4 100644 --- a/testing/spectest/shared/common/forkchoice/builder.go +++ b/testing/spectest/shared/common/forkchoice/builder.go @@ -90,7 +90,7 @@ func (bb *Builder) InvalidBlock(t testing.TB, b interfaces.ReadOnlySignedBeaconB r := bb.block(t, b) ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second) defer cancel() - require.Equal(t, true, bb.service.ReceiveBlock(ctx, b, r) != nil) + require.Equal(t, true, bb.service.ReceiveBlock(ctx, b, r, nil) != nil) } // ValidBlock receives the valid block and notifies forkchoice. @@ -98,7 +98,7 @@ func (bb *Builder) ValidBlock(t testing.TB, b interfaces.ReadOnlySignedBeaconBlo r := bb.block(t, b) ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(params.BeaconConfig().SecondsPerSlot)*time.Second) defer cancel() - require.NoError(t, bb.service.ReceiveBlock(ctx, b, r)) + require.NoError(t, bb.service.ReceiveBlock(ctx, b, r, nil)) } // PoWBlock receives the block and notifies a mocked execution engine.