Skip to content

Commit

Permalink
Revert "Replace validator wait for activation stream with polling (#1…
Browse files Browse the repository at this point in the history
…4514)"

This reverts commit 6c22ede.
  • Loading branch information
james-prysm committed Nov 22, 2024
1 parent 470581e commit 775a694
Show file tree
Hide file tree
Showing 18 changed files with 1,263 additions and 543 deletions.
1 change: 0 additions & 1 deletion beacon-chain/rpc/prysm/v1alpha1/validator/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type Server struct {
// WaitForActivation checks if a validator public key exists in the active validator registry of the current
// beacon state, if not, then it creates a stream which listens for canonical states which contain
// the validator with the public key as an active validator record.
// Deprecated: do not use, just poll validator status every epoch.
func (vs *Server) WaitForActivation(req *ethpb.ValidatorActivationRequest, stream ethpb.BeaconNodeValidator_WaitForActivationServer) error {
activeValidatorExists, validatorStatuses, err := vs.activationStatus(stream.Context(), req.PublicKeys)
if err != nil {
Expand Down
567 changes: 282 additions & 285 deletions proto/prysm/v1alpha1/validator.pb.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion proto/prysm/v1alpha1/validator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ service BeaconNodeValidator {
option (google.api.http) = {
get: "/eth/v1alpha1/validator/activation/stream"
};
option deprecated = true;
}

// ValidatorIndex retrieves a validator's index location in the beacon state's
Expand Down
123 changes: 123 additions & 0 deletions testing/mock/beacon_validator_client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 0 additions & 23 deletions time/slots/slottime.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
mathutil "github.com/prysmaticlabs/prysm/v5/math"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/sirupsen/logrus"
)

// MaxSlotBuffer specifies the max buffer given to slots from
Expand Down Expand Up @@ -287,25 +286,3 @@ func WithinVotingWindow(genesisTime uint64, slot primitives.Slot) bool {
func MaxSafeEpoch() primitives.Epoch {
return primitives.Epoch(math.MaxUint64 / uint64(params.BeaconConfig().SlotsPerEpoch))
}

// SecondsUntilNextEpochStart returns how many seconds until the next Epoch start from the current time and slot
func SecondsUntilNextEpochStart(genesisTimeSec uint64) (uint64, error) {
currentSlot := CurrentSlot(genesisTimeSec)
firstSlotOfNextEpoch, err := EpochStart(ToEpoch(currentSlot) + 1)
if err != nil {
return 0, err
}
nextEpochStartTime, err := ToTime(genesisTimeSec, firstSlotOfNextEpoch)
if err != nil {
return 0, err
}
es := nextEpochStartTime.Unix()
n := time.Now().Unix()
waitTime := uint64(es - n)
log.WithFields(logrus.Fields{
"current_slot": currentSlot,
"next_epoch_start_slot": firstSlotOfNextEpoch,
"is_epoch_start": IsEpochStart(currentSlot),
}).Debugf("%d seconds until next epoch", waitTime)
return waitTime, nil
}
25 changes: 0 additions & 25 deletions time/slots/slottime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,28 +607,3 @@ func TestWithinVotingWindow(t *testing.T) {
genesisTime = uint64(time.Now().Add(-40 * time.Second).Unix())
require.Equal(t, false, WithinVotingWindow(genesisTime, 3))
}

func TestSecondsUntilNextEpochStart(t *testing.T) {
secondsInEpoch := uint64(params.BeaconConfig().SlotsPerEpoch) * params.BeaconConfig().SecondsPerSlot
// try slot 3
genesisTime := uint64(time.Now().Add(-39 * time.Second).Unix())
waitTime, err := SecondsUntilNextEpochStart(genesisTime)
require.NoError(t, err)
require.Equal(t, secondsInEpoch-(params.BeaconConfig().SecondsPerSlot*3)-3, waitTime)
// try slot 34
genesisTime = uint64(time.Now().Add(time.Duration(-1*int(secondsInEpoch)-int(params.BeaconConfig().SecondsPerSlot*2)-5) * time.Second).Unix())
waitTime, err = SecondsUntilNextEpochStart(genesisTime)
require.NoError(t, err)
require.Equal(t, secondsInEpoch-(params.BeaconConfig().SecondsPerSlot*2)-5, waitTime)

// check if waitTime is correctly EpochStart
n := time.Now().Add(-39 * time.Second)
genesisTime = uint64(n.Unix())
waitTime, err = SecondsUntilNextEpochStart(genesisTime)
require.NoError(t, err)
require.Equal(t, secondsInEpoch-39, waitTime)
newGenesisTime := uint64(n.Add(time.Duration(-1*int(waitTime)) * time.Second).Unix())
currentSlot := CurrentSlot(newGenesisTime)
require.Equal(t, true, IsEpochStart(currentSlot))

}
2 changes: 2 additions & 0 deletions validator/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ go_test(
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/mock:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"//testing/validator-mock:go_default_library",
Expand All @@ -168,6 +169,7 @@ go_test(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@com_github_stretchr_testify//mock:go_default_library",
"@com_github_tyler_smith_go_bip39//:go_default_library",
"@com_github_urfave_cli_v2//:go_default_library",
"@com_github_wealdtech_go_eth2_util//:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions validator/client/beacon-api/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"activation.go",
"attestation_data.go",
"beacon_api_beacon_chain_client.go",
"beacon_api_helpers.go",
Expand Down Expand Up @@ -74,6 +75,7 @@ go_test(
name = "go_default_test",
size = "small",
srcs = [
"activation_test.go",
"attestation_data_test.go",
"beacon_api_beacon_chain_client_test.go",
"beacon_api_helpers_test.go",
Expand Down
121 changes: 121 additions & 0 deletions validator/client/beacon-api/activation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package beacon_api

import (
"context"
"strconv"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"google.golang.org/grpc"
)

func (c *beaconApiValidatorClient) waitForActivation(ctx context.Context, in *ethpb.ValidatorActivationRequest) (ethpb.BeaconNodeValidator_WaitForActivationClient, error) {
return &waitForActivationClient{
ctx: ctx,
beaconApiValidatorClient: c,
ValidatorActivationRequest: in,
}, nil
}

type waitForActivationClient struct {
grpc.ClientStream
ctx context.Context
*beaconApiValidatorClient
*ethpb.ValidatorActivationRequest
lastRecvTime time.Time
}

func computeWaitElements(now, lastRecvTime time.Time) (time.Duration, time.Time) {
nextRecvTime := lastRecvTime.Add(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)

if lastRecvTime.IsZero() {
nextRecvTime = now
}

if nextRecvTime.Before(now) {
return time.Duration(0), now
}

return nextRecvTime.Sub(now), nextRecvTime
}

func (c *waitForActivationClient) Recv() (*ethpb.ValidatorActivationResponse, error) {
waitDuration, nextRecvTime := computeWaitElements(time.Now(), c.lastRecvTime)

select {
case <-time.After(waitDuration):
c.lastRecvTime = nextRecvTime

// Represents the target set of keys
stringTargetPubKeysToPubKeys := make(map[string][]byte, len(c.ValidatorActivationRequest.PublicKeys))
stringTargetPubKeys := make([]string, len(c.ValidatorActivationRequest.PublicKeys))

// Represents the set of keys actually returned by the beacon node
stringRetrievedPubKeys := make(map[string]struct{})

// Contains all keys in targetPubKeys but not in retrievedPubKeys
var missingPubKeys [][]byte

var statuses []*ethpb.ValidatorActivationResponse_Status

for index, publicKey := range c.ValidatorActivationRequest.PublicKeys {
stringPubKey := hexutil.Encode(publicKey)
stringTargetPubKeysToPubKeys[stringPubKey] = publicKey
stringTargetPubKeys[index] = stringPubKey
}

stateValidators, err := c.stateValidatorsProvider.StateValidators(c.ctx, stringTargetPubKeys, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "failed to get state validators")
}

for _, data := range stateValidators.Data {
pubkey, err := hexutil.Decode(data.Validator.Pubkey)
if err != nil {
return nil, errors.Wrap(err, "failed to parse validator public key")
}

stringRetrievedPubKeys[data.Validator.Pubkey] = struct{}{}

index, err := strconv.ParseUint(data.Index, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "failed to parse validator index")
}

validatorStatus, ok := beaconAPITogRPCValidatorStatus[data.Status]
if !ok {
return nil, errors.New("invalid validator status: " + data.Status)
}

statuses = append(statuses, &ethpb.ValidatorActivationResponse_Status{
PublicKey: pubkey,
Index: primitives.ValidatorIndex(index),
Status: &ethpb.ValidatorStatusResponse{Status: validatorStatus},
})
}

for stringTargetPubKey, targetPubKey := range stringTargetPubKeysToPubKeys {
if _, ok := stringRetrievedPubKeys[stringTargetPubKey]; !ok {
missingPubKeys = append(missingPubKeys, targetPubKey)
}
}

for _, missingPubKey := range missingPubKeys {
statuses = append(statuses, &ethpb.ValidatorActivationResponse_Status{
PublicKey: missingPubKey,
Index: primitives.ValidatorIndex(^uint64(0)),
Status: &ethpb.ValidatorStatusResponse{Status: ethpb.ValidatorStatus_UNKNOWN_STATUS},
})
}

return &ethpb.ValidatorActivationResponse{
Statuses: statuses,
}, nil
case <-c.ctx.Done():
return nil, errors.New("context canceled")
}
}
Loading

0 comments on commit 775a694

Please sign in to comment.