diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index b69436fdc500..ef7d9f9f6a09 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "chain_info.go", "chain_info_forkchoice.go", "currently_syncing_block.go", + "defragment.go", "error.go", "execution_engine.go", "forkchoice_update_execution.go", diff --git a/beacon-chain/blockchain/defragment.go b/beacon-chain/blockchain/defragment.go new file mode 100644 index 000000000000..2f3c888e5880 --- /dev/null +++ b/beacon-chain/blockchain/defragment.go @@ -0,0 +1,27 @@ +package blockchain + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" + "github.com/prysmaticlabs/prysm/v4/config/features" + "github.com/prysmaticlabs/prysm/v4/time" +) + +var stateDefragmentationTime = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "head_state_defragmentation_milliseconds", + Help: "Milliseconds it takes to defragment the head state", +}) + +// This method defragments our state, so that any specific fields which have +// a higher number of fragmented indexes are reallocated to a new separate slice for +// that field. +func (s *Service) defragmentState(st state.BeaconState) { + if !features.Get().EnableExperimentalState { + return + } + startTime := time.Now() + st.Defragment() + elapsedTime := time.Since(startTime) + stateDefragmentationTime.Observe(float64(elapsedTime.Milliseconds())) +} diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 581b0bdbd3d5..66d2647c756a 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -123,6 +123,9 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig } daWaitedTime := time.Since(daStartTime) + // Defragment the state before continuing block processing. + s.defragmentState(postState) + // The rest of block processing takes a lock on forkchoice. s.cfg.ForkChoiceStore.Lock() defer s.cfg.ForkChoiceStore.Unlock() diff --git a/beacon-chain/state/interfaces.go b/beacon-chain/state/interfaces.go index 9b052cd3e739..10e93d65bd09 100644 --- a/beacon-chain/state/interfaces.go +++ b/beacon-chain/state/interfaces.go @@ -22,6 +22,7 @@ type BeaconState interface { WriteOnlyBeaconState Copy() BeaconState CopyAllTries() + Defragment() HashTreeRoot(ctx context.Context) ([32]byte, error) Prover json.Marshaler diff --git a/beacon-chain/state/state-native/multi_value_slices.go b/beacon-chain/state/state-native/multi_value_slices.go index de7b47e55f6a..0e2385edc7e8 100644 --- a/beacon-chain/state/state-native/multi_value_slices.go +++ b/beacon-chain/state/state-native/multi_value_slices.go @@ -123,6 +123,55 @@ func NewMultiValueValidators(vals []*ethpb.Validator) *MultiValueValidators { return mv } +// Defragment checks whether each individual multi-value field in our state is fragmented +// and if it is, it will 'reset' the field to create a new multivalue object. +func (b *BeaconState) Defragment() { + b.lock.Lock() + defer b.lock.Unlock() + if b.blockRootsMultiValue != nil && b.blockRootsMultiValue.IsFragmented() { + initialMVslice := b.blockRootsMultiValue + b.blockRootsMultiValue = b.blockRootsMultiValue.Reset(b) + initialMVslice.Detach(b) + multiValueCountGauge.WithLabelValues(types.BlockRoots.String()).Inc() + runtime.SetFinalizer(b.blockRootsMultiValue, blockRootsFinalizer) + } + if b.stateRootsMultiValue != nil && b.stateRootsMultiValue.IsFragmented() { + initialMVslice := b.stateRootsMultiValue + b.stateRootsMultiValue = b.stateRootsMultiValue.Reset(b) + initialMVslice.Detach(b) + multiValueCountGauge.WithLabelValues(types.StateRoots.String()).Inc() + runtime.SetFinalizer(b.stateRootsMultiValue, stateRootsFinalizer) + } + if b.randaoMixesMultiValue != nil && b.randaoMixesMultiValue.IsFragmented() { + initialMVslice := b.randaoMixesMultiValue + b.randaoMixesMultiValue = b.randaoMixesMultiValue.Reset(b) + initialMVslice.Detach(b) + multiValueCountGauge.WithLabelValues(types.RandaoMixes.String()).Inc() + runtime.SetFinalizer(b.randaoMixesMultiValue, randaoMixesFinalizer) + } + if b.balancesMultiValue != nil && b.balancesMultiValue.IsFragmented() { + initialMVslice := b.balancesMultiValue + b.balancesMultiValue = b.balancesMultiValue.Reset(b) + initialMVslice.Detach(b) + multiValueCountGauge.WithLabelValues(types.Balances.String()).Inc() + runtime.SetFinalizer(b.balancesMultiValue, balancesFinalizer) + } + if b.validatorsMultiValue != nil && b.validatorsMultiValue.IsFragmented() { + initialMVslice := b.validatorsMultiValue + b.validatorsMultiValue = b.validatorsMultiValue.Reset(b) + initialMVslice.Detach(b) + multiValueCountGauge.WithLabelValues(types.Validators.String()).Inc() + runtime.SetFinalizer(b.validatorsMultiValue, validatorsFinalizer) + } + if b.inactivityScoresMultiValue != nil && b.inactivityScoresMultiValue.IsFragmented() { + initialMVslice := b.inactivityScoresMultiValue + b.inactivityScoresMultiValue = b.inactivityScoresMultiValue.Reset(b) + initialMVslice.Detach(b) + multiValueCountGauge.WithLabelValues(types.InactivityScores.String()).Inc() + runtime.SetFinalizer(b.inactivityScoresMultiValue, inactivityScoresFinalizer) + } +} + func randaoMixesFinalizer(m *MultiValueRandaoMixes) { multiValueCountGauge.WithLabelValues(types.RandaoMixes.String()).Dec() } diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 9e4c7621b82b..8ccb74dac1a5 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -606,7 +606,7 @@ func TestBlocksFetcher_WaitForBandwidth(t *testing.T) { p1.Connect(p2) require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") req := ðpb.BeaconBlocksByRangeRequest{ - Count: 64, + Count: 64, } topic := p2pm.RPCBlocksByRangeTopicV1 diff --git a/container/multi-value-slice/multi_value_slice.go b/container/multi-value-slice/multi_value_slice.go index 4149cd27aa11..81d8009fb692 100644 --- a/container/multi-value-slice/multi_value_slice.go +++ b/container/multi-value-slice/multi_value_slice.go @@ -96,6 +96,10 @@ import ( "github.com/pkg/errors" ) +// Amount of references beyond which a multivalue object is considered +// fragmented. +const fragmentationLimit = 50000 + // Id is an object identifier. type Id = uint64 @@ -424,6 +428,58 @@ func (s *Slice[V]) MultiValueStatistics() MultiValueStatistics { return stats } +// IsFragmented checks if our mutlivalue object is fragmented (individual references held). +// If the number of references is higher than our threshold we return true. +func (s *Slice[V]) IsFragmented() bool { + stats := s.MultiValueStatistics() + return stats.TotalIndividualElemReferences+stats.TotalAppendedElemReferences >= fragmentationLimit +} + +// Reset builds a new multivalue object with respect to the +// provided object's id. The base slice will be based on this +// particular id. +func (s *Slice[V]) Reset(obj Identifiable) *Slice[V] { + s.lock.RLock() + defer s.lock.RUnlock() + + l, ok := s.cachedLengths[obj.Id()] + if !ok { + l = len(s.sharedItems) + } + + items := make([]V, l) + copy(items, s.sharedItems) + for i, ind := range s.individualItems { + for _, v := range ind.Values { + _, found := containsId(v.ids, obj.Id()) + if found { + items[i] = v.val + break + } + } + } + + index := len(s.sharedItems) + for _, app := range s.appendedItems { + found := true + for _, v := range app.Values { + _, found = containsId(v.ids, obj.Id()) + if found { + items[index] = v.val + index++ + break + } + } + if !found { + break + } + } + + reset := &Slice[V]{} + reset.Init(items) + return reset +} + func (s *Slice[V]) fillOriginalItems(obj Identifiable, items *[]V) { for i, item := range s.sharedItems { ind, ok := s.individualItems[uint64(i)] diff --git a/container/multi-value-slice/multi_value_slice_test.go b/container/multi-value-slice/multi_value_slice_test.go index 0d19db6bfbd7..b5ecbefabb93 100644 --- a/container/multi-value-slice/multi_value_slice_test.go +++ b/container/multi-value-slice/multi_value_slice_test.go @@ -326,6 +326,156 @@ func TestDetach(t *testing.T) { assert.Equal(t, false, ok) } +func TestReset(t *testing.T) { + s := setup() + obj := &testObject{id: 2} + + reset := s.Reset(obj) + + assert.Equal(t, 8, len(reset.sharedItems)) + assert.Equal(t, 123, reset.sharedItems[0]) + assert.Equal(t, 2, reset.sharedItems[1]) + assert.Equal(t, 3, reset.sharedItems[2]) + assert.Equal(t, 123, reset.sharedItems[3]) + assert.Equal(t, 2, reset.sharedItems[4]) + assert.Equal(t, 2, reset.sharedItems[5]) + assert.Equal(t, 3, reset.sharedItems[6]) + assert.Equal(t, 2, reset.sharedItems[7]) + assert.Equal(t, 0, len(reset.individualItems)) + assert.Equal(t, 0, len(reset.appendedItems)) +} + +func TestFragmentation_IndividualReferences(t *testing.T) { + s := &Slice[int]{} + s.Init([]int{123, 123, 123, 123, 123}) + s.individualItems[1] = &MultiValueItem[int]{ + Values: []*Value[int]{ + { + val: 1, + ids: []uint64{1}, + }, + { + val: 2, + ids: []uint64{2}, + }, + }, + } + s.individualItems[2] = &MultiValueItem[int]{ + Values: []*Value[int]{ + { + val: 3, + ids: []uint64{1, 2}, + }, + }, + } + + numOfRefs := fragmentationLimit / 2 + for i := 3; i < numOfRefs; i++ { + obj := &testObject{id: uint64(i)} + s.Copy(&testObject{id: 1}, obj) + } + + assert.Equal(t, false, s.IsFragmented()) + + // Add more references to hit fragmentation limit. Id 1 + // has 2 references above. + for i := numOfRefs; i < numOfRefs+3; i++ { + obj := &testObject{id: uint64(i)} + s.Copy(&testObject{id: 1}, obj) + } + assert.Equal(t, true, s.IsFragmented()) +} + +func TestFragmentation_AppendedReferences(t *testing.T) { + s := &Slice[int]{} + s.Init([]int{123, 123, 123, 123, 123}) + s.appendedItems = []*MultiValueItem[int]{ + { + Values: []*Value[int]{ + { + val: 1, + ids: []uint64{1}, + }, + { + val: 2, + ids: []uint64{2}, + }, + }, + }, + { + Values: []*Value[int]{ + { + val: 3, + ids: []uint64{1, 2}, + }, + }, + }, + } + s.cachedLengths[1] = 7 + s.cachedLengths[2] = 8 + + numOfRefs := fragmentationLimit / 2 + for i := 3; i < numOfRefs; i++ { + obj := &testObject{id: uint64(i)} + s.Copy(&testObject{id: 1}, obj) + } + + assert.Equal(t, false, s.IsFragmented()) + + // Add more references to hit fragmentation limit. Id 1 + // has 2 references above. + for i := numOfRefs; i < numOfRefs+3; i++ { + obj := &testObject{id: uint64(i)} + s.Copy(&testObject{id: 1}, obj) + } + assert.Equal(t, true, s.IsFragmented()) +} + +func TestFragmentation_IndividualAndAppendedReferences(t *testing.T) { + s := &Slice[int]{} + s.Init([]int{123, 123, 123, 123, 123}) + s.individualItems[2] = &MultiValueItem[int]{ + Values: []*Value[int]{ + { + val: 3, + ids: []uint64{1, 2}, + }, + }, + } + s.appendedItems = []*MultiValueItem[int]{ + { + Values: []*Value[int]{ + { + val: 1, + ids: []uint64{1}, + }, + { + val: 2, + ids: []uint64{2}, + }, + }, + }, + } + s.cachedLengths[1] = 7 + s.cachedLengths[2] = 8 + + numOfRefs := fragmentationLimit / 2 + for i := 3; i < numOfRefs; i++ { + obj := &testObject{id: uint64(i)} + s.Copy(&testObject{id: 1}, obj) + } + + assert.Equal(t, false, s.IsFragmented()) + + // Add more references to hit fragmentation limit. Id 1 + // has 2 references above. + for i := numOfRefs; i < numOfRefs+3; i++ { + obj := &testObject{id: uint64(i)} + s.Copy(&testObject{id: 1}, obj) + } + assert.Equal(t, true, s.IsFragmented()) +} + // Share the slice between 2 objects. // Index 0: Shared value // Index 1: Different individual value