Skip to content

Commit

Permalink
VerifiedROBlobs in initial-sync (#13351)
Browse files Browse the repository at this point in the history
* Use VerifiedROBlobs in initial-sync

* Update beacon-chain/das/cache.go

Co-authored-by: Justin Traglia <[email protected]>

* Apply suggestions from code review

comment fixes

Co-authored-by: Justin Traglia <[email protected]>

* fix lint error from gh web ui

* deepsource fixes

* more deepsource

* fix init wiring

* mark blobless blocks verified in batch mode

* move sig check after parent checks

* validate block commitment length at start of da check

* remove vestigial locking

* rm more copy-locksta

* rm old comment

* fail the entire batch if any sidecar fails

* lint

* skip redundant checks, fix len check

* assume sig and proposer checks passed for block

* inherits most checks from processed block

* Assume block processing handles most checks

* lint

* cleanup unused call and gaz

* more detailed logging for e2e

* fix bad refactor breaking non-finalized init-sync

* self-review cleanup

* gaz

* Update beacon-chain/verification/blob.go

Co-authored-by: Justin Traglia <[email protected]>

* terence and justin feedback

---------

Co-authored-by: Kasey Kirkham <[email protected]>
Co-authored-by: Justin Traglia <[email protected]>
  • Loading branch information
3 people authored Jan 6, 2024
1 parent 8d092a1 commit 0e043d5
Show file tree
Hide file tree
Showing 46 changed files with 1,209 additions and 454 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_library(
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/db/filters:go_default_library",
Expand Down Expand Up @@ -141,6 +142,7 @@ go_test(
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/db/testing:go_default_library",
Expand Down
23 changes: 20 additions & 3 deletions beacon-chain/blockchain/kzg/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,26 @@ func IsDataAvailable(commitments [][]byte, sidecars []*ethpb.DeprecatedBlobSidec
return kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs)
}

// VerifyROBlobCommitment is a helper that massages the fields of an ROBlob into the types needed to call VerifyBlobKZGProof.
func VerifyROBlobCommitment(sc blocks.ROBlob) error {
return kzgContext.VerifyBlobKZGProof(bytesToBlob(sc.Blob), bytesToCommitment(sc.KzgCommitment), bytesToKZGProof(sc.KzgProof))
// Verify performs single or batch verification of commitments depending on the number of given BlobSidecars.
func Verify(sidecars ...blocks.ROBlob) error {
if len(sidecars) == 0 {
return nil
}
if len(sidecars) == 1 {
return kzgContext.VerifyBlobKZGProof(
bytesToBlob(sidecars[0].Blob),
bytesToCommitment(sidecars[0].KzgCommitment),
bytesToKZGProof(sidecars[0].KzgProof))
}
blobs := make([]GoKZG.Blob, len(sidecars))
cmts := make([]GoKZG.KZGCommitment, len(sidecars))
proofs := make([]GoKZG.KZGProof, len(sidecars))
for i, sidecar := range sidecars {
blobs[i] = bytesToBlob(sidecar.Blob)
cmts[i] = bytesToCommitment(sidecar.KzgCommitment)
proofs[i] = bytesToKZGProof(sidecar.KzgProof)
}
return kzgContext.VerifyBlobKZGProofBatch(blobs, cmts, proofs)
}

func bytesToBlob(blob []byte) (ret GoKZG.Blob) {
Expand Down
38 changes: 4 additions & 34 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
Expand Down Expand Up @@ -122,7 +123,7 @@ func getStateVersionAndPayload(st state.BeaconState) (int, interfaces.ExecutionD
return preStateVersion, preStateHeader, nil
}

func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock) error {
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock, avs das.AvailabilityStore) error {
ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch")
defer span.End()

Expand Down Expand Up @@ -225,8 +226,8 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return err
}
}
if err := s.databaseDACheck(ctx, b); err != nil {
return errors.Wrap(err, "could not validate blob data availability")
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil {
return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot())
}
args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(),
JustifiedCheckpoint: jCheckpoints[i],
Expand Down Expand Up @@ -293,37 +294,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo
return s.saveHeadNoDB(ctx, lastB, lastBR, preState, !isValidPayload)
}

func commitmentsToCheck(b consensusblocks.ROBlock, current primitives.Slot) [][]byte {
if b.Version() < version.Deneb {
return nil
}
// We are only required to check within MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) {
return nil
}
kzgCommitments, err := b.Block().Body().BlobKzgCommitments()
if err != nil {
return nil
}
return kzgCommitments
}

func (s *Service) databaseDACheck(ctx context.Context, b consensusblocks.ROBlock) error {
commitments := commitmentsToCheck(b, s.CurrentSlot())
if len(commitments) == 0 {
return nil
}
missing, err := missingIndices(s.blobStorage, b.Root(), commitments)
if err != nil {
return err
}
if len(missing) == 0 {
return nil
}
// TODO: don't worry that this error isn't informative, it will be superceded by a detailed sidecar cache error.
return errors.New("not all kzg commitments are available")
}

func (s *Service) updateEpochBoundaryCaches(ctx context.Context, st state.BeaconState) error {
e := coreTime.CurrentEpoch(st)
if err := helpers.UpdateCommitteeCache(ctx, st, e); err != nil {
Expand Down
77 changes: 4 additions & 73 deletions beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockchain

import (
"bytes"
"context"
"fmt"
"math/big"
Expand All @@ -17,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
testDB "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
Expand All @@ -40,7 +40,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
"github.com/prysmaticlabs/prysm/v4/time/slots"
logTest "github.com/sirupsen/logrus/hooks/test"
)

Expand Down Expand Up @@ -69,7 +68,7 @@ func TestStore_OnBlockBatch(t *testing.T) {
require.NoError(t, err)
blks = append(blks, rwsb)
}
err := service.onBlockBatch(ctx, blks)
err := service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{})
require.NoError(t, err)
jcp := service.CurrentJustifiedCheckpt()
jroot := bytesutil.ToBytes32(jcp.Root)
Expand Down Expand Up @@ -99,7 +98,7 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) {
require.NoError(t, service.saveInitSyncBlock(ctx, rwsb.Root(), wsb))
blks = append(blks, rwsb)
}
require.NoError(t, service.onBlockBatch(ctx, blks))
require.NoError(t, service.onBlockBatch(ctx, blks, &das.MockAvailabilityStore{}))
}

func TestCachedPreState_CanGetFromStateSummary(t *testing.T) {
Expand Down Expand Up @@ -1946,7 +1945,7 @@ func TestNoViableHead_Reboot(t *testing.T) {
rwsb, err := consensusblocks.NewROBlock(wsb)
require.NoError(t, err)
// We use onBlockBatch here because the valid chain is missing in forkchoice
require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}))
require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}, &das.MockAvailabilityStore{}))
// Check that the head is now VALID and the node is not optimistic
require.Equal(t, genesisRoot, service.ensureRootNotZeros(service.cfg.ForkChoiceStore.CachedHeadRoot()))
headRoot, err = service.HeadRoot(ctx)
Expand Down Expand Up @@ -2049,74 +2048,6 @@ func driftGenesisTime(s *Service, slot, delay int64) {
s.SetGenesisTime(time.Unix(time.Now().Unix()-offset, 0))
}

func Test_commitmentsToCheck(t *testing.T) {
windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest)
require.NoError(t, err)
commits := [][]byte{
bytesutil.PadTo([]byte("a"), 48),
bytesutil.PadTo([]byte("b"), 48),
bytesutil.PadTo([]byte("c"), 48),
bytesutil.PadTo([]byte("d"), 48),
}
cases := []struct {
name string
commits [][]byte
block func(*testing.T) consensusblocks.ROBlock
slot primitives.Slot
}{
{
name: "pre deneb",
block: func(t *testing.T) consensusblocks.ROBlock {
bb := util.NewBeaconBlockBellatrix()
sb, err := consensusblocks.NewSignedBeaconBlock(bb)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
},
{
name: "commitments within da",
block: func(t *testing.T) consensusblocks.ROBlock {
d := util.NewBeaconBlockDeneb()
d.Block.Body.BlobKzgCommitments = commits
d.Block.Slot = 100
sb, err := consensusblocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
commits: commits,
slot: 100,
},
{
name: "commitments outside da",
block: func(t *testing.T) consensusblocks.ROBlock {
d := util.NewBeaconBlockDeneb()
// block is from slot 0, "current slot" is window size +1 (so outside the window)
d.Block.Body.BlobKzgCommitments = commits
sb, err := consensusblocks.NewSignedBeaconBlock(d)
require.NoError(t, err)
rb, err := consensusblocks.NewROBlock(sb)
require.NoError(t, err)
return rb
},
slot: windowSlots + 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
b := c.block(t)
co := commitmentsToCheck(b, c.slot)
require.Equal(t, len(c.commits), len(co))
for i := 0; i < len(c.commits); i++ {
require.Equal(t, true, bytes.Equal(c.commits[i], co[i]))
}
})
}
}

func TestMissingIndices(t *testing.T) {
cases := []struct {
name string
Expand Down
26 changes: 18 additions & 8 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
coreTime "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
Expand All @@ -33,8 +34,8 @@ var epochsSinceFinalitySaveHotStateDB = primitives.Epoch(100)

// BlockReceiver interface defines the methods of chain service for receiving and processing new blocks.
type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error
HasBlock(ctx context.Context, root [32]byte) bool
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
BlockBeingSynced([32]byte) bool
Expand All @@ -56,7 +57,7 @@ type SlashingReceiver interface {
// 1. Validate block, apply state transition and update checkpoints
// 2. Apply fork choice to the processed block
// 3. Save latest head info
func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error {
func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, avs das.AvailabilityStore) error {
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlock")
defer span.End()
// Return early if the block has been synced
Expand All @@ -72,6 +73,10 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
if err != nil {
return err
}
rob, err := blocks.NewROBlockWithRoot(block, blockRoot)
if err != nil {
return err
}

preState, err := s.getBlockPreState(ctx, blockCopy.Block())
if err != nil {
Expand Down Expand Up @@ -106,10 +111,15 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
if err := eg.Wait(); err != nil {
return err
}

daStartTime := time.Now()
if err := s.isDataAvailable(ctx, blockRoot, blockCopy); err != nil {
return errors.Wrap(err, "could not validate blob data availability")
if avs != nil {
if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), rob); err != nil {
return errors.Wrap(err, "could not validate blob data availability (AvailabilityStore.IsDataAvailable)")
}
} else {
if err := s.isDataAvailable(ctx, blockRoot, blockCopy); err != nil {
return errors.Wrap(err, "could not validate blob data availability")
}
}
daWaitedTime := time.Since(daStartTime)

Expand Down Expand Up @@ -203,15 +213,15 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
// ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning
// the state, performing batch verification of all collected signatures and then performing the appropriate
// actions for a block post-transition.
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error {
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock, avs das.AvailabilityStore) error {
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch")
defer span.End()

s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()

// Apply state transition on the incoming newly received block batches, one by one.
if err := s.onBlockBatch(ctx, blocks); err != nil {
if err := s.onBlockBatch(ctx, blocks, avs); err != nil {
err := errors.Wrap(err, "could not process block in batch")
tracing.AnnotateError(span, err)
return err
Expand Down
7 changes: 4 additions & 3 deletions beacon-chain/blockchain/receive_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

blockchainTesting "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
Expand Down Expand Up @@ -146,7 +147,7 @@ func TestService_ReceiveBlock(t *testing.T) {
require.NoError(t, err)
wsb, err := blocks.NewSignedBeaconBlock(tt.args.block)
require.NoError(t, err)
err = s.ReceiveBlock(ctx, wsb, root)
err = s.ReceiveBlock(ctx, wsb, root, nil)
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestService_ReceiveBlockUpdateHead(t *testing.T) {
go func() {
wsb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, s.ReceiveBlock(ctx, wsb, root))
require.NoError(t, s.ReceiveBlock(ctx, wsb, root, nil))
wg.Done()
}()
wg.Wait()
Expand Down Expand Up @@ -243,7 +244,7 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
require.NoError(t, err)
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb})
err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb}, &das.MockAvailabilityStore{})
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/blockchain/testing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/state:go_default_library",
Expand Down
Loading

0 comments on commit 0e043d5

Please sign in to comment.