Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis waitgroup for optimistic block sync #538

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
redisPrefix = "boost-relay"

expiryBidCache = 45 * time.Second
expiryLock = 24 * time.Second

RedisConfigFieldPubkey = "pubkey"
RedisStatsFieldLatestSlot = "latest-slot"
Expand Down Expand Up @@ -91,6 +92,7 @@ type RedisCache struct {
prefixTopBidValue string
prefixFloorBid string
prefixFloorBidValue string
prefixProcessingSlot string

// keys
keyValidatorRegistrationTimestamp string
Expand All @@ -101,6 +103,8 @@ type RedisCache struct {
keyBlockBuilderStatus string
keyLastSlotDelivered string
keyLastHashDelivered string

currentSlot uint64
}

func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
Expand Down Expand Up @@ -132,6 +136,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
prefixTopBidValue: fmt.Sprintf("%s/%s:top-bid-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
prefixFloorBid: fmt.Sprintf("%s/%s:bid-floor", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
prefixFloorBidValue: fmt.Sprintf("%s/%s:bid-floor-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey
prefixProcessingSlot: fmt.Sprintf("%s/%s:processing-slot", redisPrefix, prefix), // prefix:slot

keyValidatorRegistrationTimestamp: fmt.Sprintf("%s/%s:validator-registration-timestamp", redisPrefix, prefix),
keyRelayConfig: fmt.Sprintf("%s/%s:relay-config", redisPrefix, prefix),
Expand All @@ -141,6 +146,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) {
keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix),
keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix),
keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix),
currentSlot: 0,
}, nil
}

Expand Down Expand Up @@ -190,6 +196,11 @@ func (r *RedisCache) keyFloorBidValue(slot uint64, parentHash, proposerPubkey st
return fmt.Sprintf("%s:%d_%s_%s", r.prefixFloorBidValue, slot, parentHash, proposerPubkey)
}

// keyProcessingSlot returns the key for the counter of builder processes working on a given slot
func (r *RedisCache) keyProcessingSlot(slot uint64) string {
return fmt.Sprintf("%s:%d", r.prefixProcessingSlot, slot)
}

func (r *RedisCache) GetObj(key string, obj any) (err error) {
value, err := r.client.Get(context.Background(), key).Result()
if err != nil {
Expand Down Expand Up @@ -800,6 +811,65 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v
return err
}

// BeginProcessingSlot signals that a builder process is handling blocks for a given slot
func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) {
// Should never process more than one slot at a time
if r.currentSlot != 0 {
return fmt.Errorf("already processing slot %d", r.currentSlot) //nolint:goerr113
}

keyProcessingSlot := r.keyProcessingSlot(slot)

pipe := r.client.TxPipeline()
pipe.Incr(ctx, keyProcessingSlot)
pipe.Expire(ctx, keyProcessingSlot, expiryLock)
_, err = pipe.Exec(ctx)

if err != nil {
return err
}

r.currentSlot = slot
return nil
}

// EndProcessingSlot signals that a builder process is done handling blocks for the current slot
func (r *RedisCache) EndProcessingSlot(ctx context.Context) (err error) {
// Do not decrement if called multiple times
if r.currentSlot == 0 {
return nil
}

keyProcessingSlot := r.keyProcessingSlot(r.currentSlot)

pipe := r.client.TxPipeline()
pipe.Decr(ctx, keyProcessingSlot)
pipe.Expire(ctx, keyProcessingSlot, expiryLock)
_, err = pipe.Exec(ctx)

if err != nil {
return err
}

r.currentSlot = 0
return nil
}

// WaitForSlotComplete waits for a slot to be completed by all builder processes
func (r *RedisCache) WaitForSlotComplete(ctx context.Context, slot uint64) (err error) {
keyProcessingSlot := r.keyProcessingSlot(slot)
for {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if there should be a timeout if EndProcessingSlot fails to decrement the processing slot and the loop is stuck waiting for the slot to complete

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current failsafe is that the key is slot-specific and has a fairly short expiry. The loop will break on a value of 0, key expiry, or any other redis error. But it probably wouldn't hurt to have an explicit timeout on the mev-boost-relay side. I'll whip something up.

Also can consider keyspace events (https://redis.io/docs/manual/keyspace-notifications/) as a cleaner way to accomplish this, but might require some redis server config.

processing, err := r.client.Get(ctx, keyProcessingSlot).Uint64()
if err != nil {
return err
}
if processing == 0 {
return nil
}
time.Sleep(50 * time.Millisecond)
}
}

func (r *RedisCache) NewPipeline() redis.Pipeliner { //nolint:ireturn,nolintlint
return r.client.Pipeline()
}
Expand Down
2 changes: 1 addition & 1 deletion services/api/optimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestPrepareBuildersForSlot(t *testing.T) {
pkStr := pubkey.String()
// Clear cache.
backend.relay.blockBuildersCache = map[string]*blockBuilderCacheEntry{}
backend.relay.prepareBuildersForSlot(slot + 1)
backend.relay.prepareBuildersForSlot(slot+1, slot)
entry, ok := backend.relay.blockBuildersCache[pkStr]
require.True(t, ok)
require.Equal(t, true, entry.status.IsHighPrio)
Expand Down
50 changes: 43 additions & 7 deletions services/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ func (api *RelayAPI) IsReady() bool {
// - Stop returning bids
// - Set ready /readyz to negative status
// - Wait a bit to allow removal of service from load balancer and draining of requests
// - If in the middle of processing optimistic blocks, wait for those to finish and release redis lock
func (api *RelayAPI) StopServer() (err error) {
// avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status
if wasStopping := api.srvShutdown.Swap(true); wasStopping {
Expand All @@ -538,6 +539,13 @@ func (api *RelayAPI) StopServer() (err error) {
// wait for any active getPayload call to finish
api.getPayloadCallsInFlight.Wait()

// wait for optimistic blocks
api.optimisticBlocksWG.Wait()
err = api.redis.EndProcessingSlot(context.Background())
if err != nil {
api.log.WithError(err).Error("failed to update redis optimistic processing slot")
}

// shutdown
return api.srv.Shutdown(context.Background())
}
Expand Down Expand Up @@ -754,15 +762,19 @@ func (api *RelayAPI) processNewSlot(headSlot uint64) {
// store the head slot
api.headSlot.Store(headSlot)

// only for builder-api
// for both apis
if api.opts.BlockBuilderAPI || api.opts.ProposerAPI {
// update proposer duties in the background
go api.updateProposerDuties(headSlot)
}

// for block builder api
if api.opts.BlockBuilderAPI {
// update the optimistic slot
go api.prepareBuildersForSlot(headSlot)
go api.prepareBuildersForSlot(headSlot, prevHeadSlot)
}

// for proposer api
if api.opts.ProposerAPI {
go api.datastore.RefreshKnownValidators(api.log, api.beaconClient, headSlot)
}
Expand Down Expand Up @@ -825,11 +837,32 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) {
api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", "))
}

func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64) {
// Wait until there are no optimistic blocks being processed. Then we can
// safely update the slot.
func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) {
// First wait for this process to finish processing optimistic blocks
api.optimisticBlocksWG.Wait()

// Now we release our lock and wait for all other builder processes to wrap up
err := api.redis.EndProcessingSlot(context.Background())
if err != nil {
api.log.WithError(err).Error("failed to unlock redis optimistic processing slot")
}
err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1)
if err != nil {
api.log.WithError(err).Error("failed to get redis optimistic processing slot")
}

// Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down
if api.srvShutdown.Load() {
return
}

// Update the optimistic slot and signal processing of the next slot
api.optimisticSlot.Store(headSlot + 1)
err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1)
if err != nil {
api.log.WithError(err).Error("failed to lock redis optimistic processing slot")
api.optimisticSlot.Store(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the optimistic slot 0 if redis fails to process the slot instead of the previous head?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only place api.optimisticSlot is used is in deciding whether or not to process a submission optimistically:

submission.BidTrace.Slot == api.optimisticSlot.Load() {

We only reach this BeginProcessingSlot call once we've received a head event for the previous slot and all optimistic blocks for that slot have have finished processing:

api.processNewSlot(headEvent.Slot)

So the only valid block submissions we receive at this point are those with submission.BidTrace.Slot == headSlot+1. Whether we run api.optimisticSlot.Store(0) or api.optimisticSlot.Store(prevHeadSlot), neither will match the submission.BidTrace.Slot of new incoming blocks. Either way, the end result is that if the redis connection fails, the process will not allow optimistic submissions for the upcoming slot.

So while there are a few options for code flow, the current design sets to headSlot + 1 by default, and resets to 0 as a general "disable all optimistic processing" in case of error.

}

builders, err := api.db.GetBlockBuilders()
if err != nil {
Expand Down Expand Up @@ -1383,8 +1416,11 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request)
log.WithError(err).Error("failed to increment builder-stats after getPayload")
}

// Wait until optimistic blocks are complete.
api.optimisticBlocksWG.Wait()
// Wait until optimistic blocks are complete using the redis waitgroup
err = api.redis.WaitForSlotComplete(context.Background(), uint64(slot))
if err != nil {
api.log.WithError(err).Error("failed to get redis optimistic processing slot")
}

// Check if there is a demotion for the winning block.
_, err = api.db.GetBuilderDemotion(bidTrace)
Expand Down
Loading