Skip to content

Commit

Permalink
Add GetReplicationSetForOperationWithNoQuorum ring method and use it …
Browse files Browse the repository at this point in the history
…in getShardedRules

Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Apr 4, 2024
1 parent 1507047 commit 4796a04
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,11 @@ func (r *RingMock) GetReplicationSetForOperation(op ring.Operation) (ring.Replic
return args.Get(0).(ring.ReplicationSet), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperationWithNoQuorum(op ring.Operation) (ring.ReplicationSet, map[string]struct{}, error) {
args := r.Called(op)
return args.Get(0).(ring.ReplicationSet), make(map[string]struct{}), args.Error(1)
}

func (r *RingMock) ReplicationFactor() int {
return 0
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ type ReadRing interface {
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperationWithNoQuorum returns all instances where the input operation should be executed.
// The resulting ReplicationSet contains all healthy instances in the ring, but the computation for MaxErrors
// does not require quorum so only 1 replica is needed to complete the operation. For MaxUnavailableZones, it is
// not automatically reduced when there are unhealthy instances in a zone because healthy instances in the zone
// are still returned, but the information about zones with unhealthy instances is returned.
GetReplicationSetForOperationWithNoQuorum(op Operation) (ReplicationSet, map[string]struct{}, error)

ReplicationFactor() int

// InstancesCount returns the number of instances in the ring.
Expand Down Expand Up @@ -484,13 +491,12 @@ func (r *Ring) GetInstanceDescsForOperation(op Operation) (map[string]InstanceDe
return instanceDescs, nil
}

// GetReplicationSetForOperation implements ReadRing.
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
func (r *Ring) getReplicationSetForOperation(op Operation, requireQuorum bool) (ReplicationSet, map[string]struct{}, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
return ReplicationSet{}, make(map[string]struct{}), ErrEmptyRing
}

// Build the initial replication set, excluding unhealthy instances.
Expand All @@ -512,18 +518,24 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
maxUnavailableZones := 0

if r.cfg.ZoneAwarenessEnabled {
// Given data is replicated to RF different zones, we can tolerate a number of
// RF/2 failing zones. However, we need to protect from the case the ring currently
// contains instances in a number of zones < RF.
numReplicatedZones := utilmath.Min(len(r.ringZones), r.cfg.ReplicationFactor)
minSuccessZones := (numReplicatedZones / 2) + 1
maxUnavailableZones = minSuccessZones - 1
if requireQuorum {
// Given data is replicated to RF different zones, we can tolerate a number of
// RF/2 failing zones. However, we need to protect from the case the ring currently
// contains instances in a number of zones < RF.
minSuccessZones := (numReplicatedZones / 2) + 1
maxUnavailableZones = minSuccessZones - 1
} else {
// Given that quorum is not required, we only need at least one of the zone to be healthy to succeed. But we
// also need to handle case when RF < number of zones.
maxUnavailableZones = numReplicatedZones - 1
}

if len(zoneFailures) > maxUnavailableZones {
return ReplicationSet{}, ErrTooManyUnhealthyInstances
return ReplicationSet{}, zoneFailures, ErrTooManyUnhealthyInstances
}

if len(zoneFailures) > 0 {
if requireQuorum && len(zoneFailures) > 0 {
// We remove all instances (even healthy ones) from zones with at least
// 1 failing instance. Due to how replication works when zone-awareness is
// enabled (data is replicated to RF different zones), there's no benefit in
Expand All @@ -537,11 +549,11 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}

healthyInstances = filteredInstances
}

// Since we removed all instances from zones containing at least 1 failing
// instance, we have to decrease the max unavailable zones accordingly.
maxUnavailableZones -= len(zoneFailures)
// Since we removed all instances from zones containing at least 1 failing
// instance, we have to decrease the max unavailable zones accordingly.
maxUnavailableZones -= len(zoneFailures)
}
} else {
// Calculate the number of required instances;
// ensure we always require at least RF-1 when RF=3.
Expand All @@ -550,10 +562,15 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
numRequired = r.cfg.ReplicationFactor
}
// We can tolerate this many failures
numRequired -= r.cfg.ReplicationFactor / 2
if requireQuorum {
numRequired -= r.cfg.ReplicationFactor / 2
} else {
// if quorum is not required then 1 replica is enough to handle the request
numRequired -= r.cfg.ReplicationFactor - 1
}

if len(healthyInstances) < numRequired {
return ReplicationSet{}, ErrTooManyUnhealthyInstances
return ReplicationSet{}, zoneFailures, ErrTooManyUnhealthyInstances
}

maxErrors = len(healthyInstances) - numRequired
Expand All @@ -563,7 +580,18 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
Instances: healthyInstances,
MaxErrors: maxErrors,
MaxUnavailableZones: maxUnavailableZones,
}, nil
}, zoneFailures, nil
}

// GetReplicationSetForOperation implements ReadRing.
func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, error) {
replicationSet, _, err := r.getReplicationSetForOperation(op, true)
return replicationSet, err
}

// GetReplicationSetForOperationWithNoQuorum implements ReadRing.
func (r *Ring) GetReplicationSetForOperationWithNoQuorum(op Operation) (ReplicationSet, map[string]struct{}, error) {
return r.getReplicationSetForOperation(op, false)
}

// countTokens returns the number of tokens and tokens within the range for each instance.
Expand Down
5 changes: 5 additions & 0 deletions pkg/ring/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ func (r *RingMock) GetReplicationSetForOperation(op Operation) (ReplicationSet,
return args.Get(0).(ReplicationSet), args.Error(1)
}

func (r *RingMock) GetReplicationSetForOperationWithNoQuorum(op Operation) (ReplicationSet, map[string]struct{}, error) {
args := r.Called(op)
return args.Get(0).(ReplicationSet), make(map[string]struct{}), args.Error(1)
}

func (r *RingMock) ReplicationFactor() int {
return 0
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest
ring = r.ring.ShuffleShard(userID, shardSize)
}

rulers, err := ring.GetReplicationSetForOperation(ListRuleRingOp)
rulers, failedZones, err := ring.GetReplicationSetForOperationWithNoQuorum(ListRuleRingOp)
if err != nil {
return nil, err
}
Expand All @@ -1095,7 +1095,6 @@ func (r *Ruler) getShardedRules(ctx context.Context, userID string, rulesRequest
merged []*GroupStateDesc
errCount int
)
failedZones := make(map[string]struct{})

zoneByAddress := make(map[string]string)
if r.cfg.APIEnableRulesBackup {
Expand Down

0 comments on commit 4796a04

Please sign in to comment.