diff --git a/datastore/redis.go b/datastore/redis.go index f1f0e9b9..13363808 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -24,6 +24,7 @@ var ( redisPrefix = "boost-relay" expiryBidCache = 45 * time.Second + expiryLock = 24 * time.Second RedisConfigFieldPubkey = "pubkey" RedisStatsFieldLatestSlot = "latest-slot" @@ -91,6 +92,7 @@ type RedisCache struct { prefixTopBidValue string prefixFloorBid string prefixFloorBidValue string + prefixProcessingSlot string // keys keyValidatorRegistrationTimestamp string @@ -101,6 +103,8 @@ type RedisCache struct { keyBlockBuilderStatus string keyLastSlotDelivered string keyLastHashDelivered string + + currentSlot uint64 } func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { @@ -132,6 +136,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { prefixTopBidValue: fmt.Sprintf("%s/%s:top-bid-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey prefixFloorBid: fmt.Sprintf("%s/%s:bid-floor", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey prefixFloorBidValue: fmt.Sprintf("%s/%s:bid-floor-value", redisPrefix, prefix), // prefix:slot_parentHash_proposerPubkey + prefixProcessingSlot: fmt.Sprintf("%s/%s:processing-slot", redisPrefix, prefix), // prefix:slot keyValidatorRegistrationTimestamp: fmt.Sprintf("%s/%s:validator-registration-timestamp", redisPrefix, prefix), keyRelayConfig: fmt.Sprintf("%s/%s:relay-config", redisPrefix, prefix), @@ -141,6 +146,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix), keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix), keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix), + currentSlot: 0, }, nil } @@ -190,6 +196,11 @@ func (r *RedisCache) keyFloorBidValue(slot uint64, parentHash, proposerPubkey st return fmt.Sprintf("%s:%d_%s_%s", r.prefixFloorBidValue, slot, parentHash, proposerPubkey) } +// keyProcessingSlot returns the key for the counter of builder processes working on a given slot +func (r *RedisCache) keyProcessingSlot(slot uint64) string { + return fmt.Sprintf("%s:%d", r.prefixProcessingSlot, slot) +} + func (r *RedisCache) GetObj(key string, obj any) (err error) { value, err := r.client.Get(context.Background(), key).Result() if err != nil { @@ -800,6 +811,65 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v return err } +// BeginProcessingSlot signals that a builder process is handling blocks for a given slot +func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) { + // Should never process more than one slot at a time + if r.currentSlot != 0 { + return fmt.Errorf("already processing slot %d", r.currentSlot) //nolint:goerr113 + } + + keyProcessingSlot := r.keyProcessingSlot(slot) + + pipe := r.client.TxPipeline() + pipe.Incr(ctx, keyProcessingSlot) + pipe.Expire(ctx, keyProcessingSlot, expiryLock) + _, err = pipe.Exec(ctx) + + if err != nil { + return err + } + + r.currentSlot = slot + return nil +} + +// EndProcessingSlot signals that a builder process is done handling blocks for the current slot +func (r *RedisCache) EndProcessingSlot(ctx context.Context) (err error) { + // Do not decrement if called multiple times + if r.currentSlot == 0 { + return nil + } + + keyProcessingSlot := r.keyProcessingSlot(r.currentSlot) + + pipe := r.client.TxPipeline() + pipe.Decr(ctx, keyProcessingSlot) + pipe.Expire(ctx, keyProcessingSlot, expiryLock) + _, err = pipe.Exec(ctx) + + if err != nil { + return err + } + + r.currentSlot = 0 + return nil +} + +// WaitForSlotComplete waits for a slot to be completed by all builder processes +func (r *RedisCache) WaitForSlotComplete(ctx context.Context, slot uint64) (err error) { + keyProcessingSlot := r.keyProcessingSlot(slot) + for { + processing, err := r.client.Get(ctx, keyProcessingSlot).Uint64() + if err != nil { + return err + } + if processing == 0 { + return nil + } + time.Sleep(50 * time.Millisecond) + } +} + func (r *RedisCache) NewPipeline() redis.Pipeliner { //nolint:ireturn,nolintlint return r.client.Pipeline() } diff --git a/services/api/optimistic_test.go b/services/api/optimistic_test.go index b7a35b80..e9b33e30 100644 --- a/services/api/optimistic_test.go +++ b/services/api/optimistic_test.go @@ -358,7 +358,7 @@ func TestPrepareBuildersForSlot(t *testing.T) { pkStr := pubkey.String() // Clear cache. backend.relay.blockBuildersCache = map[string]*blockBuilderCacheEntry{} - backend.relay.prepareBuildersForSlot(slot + 1) + backend.relay.prepareBuildersForSlot(slot+1, slot) entry, ok := backend.relay.blockBuildersCache[pkStr] require.True(t, ok) require.Equal(t, true, entry.status.IsHighPrio) diff --git a/services/api/service.go b/services/api/service.go index 9dea594f..96b56693 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -516,6 +516,7 @@ func (api *RelayAPI) IsReady() bool { // - Stop returning bids // - Set ready /readyz to negative status // - Wait a bit to allow removal of service from load balancer and draining of requests +// - If in the middle of processing optimistic blocks, wait for those to finish and release redis lock func (api *RelayAPI) StopServer() (err error) { // avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status if wasStopping := api.srvShutdown.Swap(true); wasStopping { @@ -538,6 +539,13 @@ func (api *RelayAPI) StopServer() (err error) { // wait for any active getPayload call to finish api.getPayloadCallsInFlight.Wait() + // wait for optimistic blocks + api.optimisticBlocksWG.Wait() + err = api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } + // shutdown return api.srv.Shutdown(context.Background()) } @@ -754,15 +762,19 @@ func (api *RelayAPI) processNewSlot(headSlot uint64) { // store the head slot api.headSlot.Store(headSlot) - // only for builder-api + // for both apis if api.opts.BlockBuilderAPI || api.opts.ProposerAPI { // update proposer duties in the background go api.updateProposerDuties(headSlot) + } + // for block builder api + if api.opts.BlockBuilderAPI { // update the optimistic slot - go api.prepareBuildersForSlot(headSlot) + go api.prepareBuildersForSlot(headSlot, prevHeadSlot) } + // for proposer api if api.opts.ProposerAPI { go api.datastore.RefreshKnownValidators(api.log, api.beaconClient, headSlot) } @@ -825,11 +837,32 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) { api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", ")) } -func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64) { - // Wait until there are no optimistic blocks being processed. Then we can - // safely update the slot. +func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) { + // First wait for this process to finish processing optimistic blocks api.optimisticBlocksWG.Wait() + + // Now we release our lock and wait for all other builder processes to wrap up + err := api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to unlock redis optimistic processing slot") + } + err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } + + // Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down + if api.srvShutdown.Load() { + return + } + + // Update the optimistic slot and signal processing of the next slot api.optimisticSlot.Store(headSlot + 1) + err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to lock redis optimistic processing slot") + api.optimisticSlot.Store(0) + } builders, err := api.db.GetBlockBuilders() if err != nil { @@ -1383,8 +1416,11 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) log.WithError(err).Error("failed to increment builder-stats after getPayload") } - // Wait until optimistic blocks are complete. - api.optimisticBlocksWG.Wait() + // Wait until optimistic blocks are complete using the redis waitgroup + err = api.redis.WaitForSlotComplete(context.Background(), uint64(slot)) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } // Check if there is a demotion for the winning block. _, err = api.db.GetBuilderDemotion(bidTrace)