Skip to content

Commit

Permalink
Calculate current ownership when generating tokens for an ingester th…
Browse files Browse the repository at this point in the history
…at already have tokens
  • Loading branch information
alanprot committed Jul 5, 2024
1 parent b26bc82 commit 009f61e
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
* [BUGFIX] Ingester: Fix issue with the minimize token generator where it was not taking in consideration the current ownerhip of an instance when generating extra tokens. #6062

## 1.17.1 2024-05-20

Expand Down
25 changes: 20 additions & 5 deletions pkg/ring/token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,15 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
}

// Only take in consideration tokens from instances in the same AZ
if i != id && instance.Zone == zone {
instanceTokens = append(instanceTokens, instance.Tokens)
if instance.Zone != zone {
continue
}

instanceTokens = append(instanceTokens, instance.Tokens)

// Do not add the current instance on the tokensPerInstanceWithDistance map as it will be used to create the heap
// to calculate from what instance we should take ownership
if i != id {
tokensPerInstanceWithDistance[i] = &totalTokenPerInstance{id: i, zone: instance.Zone}

if len(instance.Tokens) == 0 {
Expand All @@ -117,6 +124,7 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
}

zonalTokens := MergeTokens(instanceTokens)
currentInstance := &totalTokenPerInstance{id: id, zone: zone}

// If we don't have tokens to split, lets create the tokens randomly
if len(zonalTokens) == 0 {
Expand All @@ -127,14 +135,22 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin
// This map will be later on used to create the heap in order to take tokens from the ingesters with most distance
for i := 1; i <= len(zonalTokens); i++ {
index := i % len(zonalTokens)
if id, ok := usedTokens[zonalTokens[index]]; ok {
instanceDistance := tokensPerInstanceWithDistance[id]
if tokenInstanceId, ok := usedTokens[zonalTokens[index]]; ok && tokenInstanceId != id {
instanceDistance := tokensPerInstanceWithDistance[tokenInstanceId]
instanceDistance.tokens = append(instanceDistance.tokens, &tokenDistanceEntry{
token: zonalTokens[index],
prev: zonalTokens[i-1],
distance: tokenDistance(zonalTokens[i-1], zonalTokens[index]),
})
instanceDistance.totalDistance += tokenDistance(zonalTokens[i-1], zonalTokens[index])
} else if tokenInstanceId == id {
// If the token is owned by the current instance, lets calculate the current distance
currentInstance.tokens = append(currentInstance.tokens, &tokenDistanceEntry{
token: zonalTokens[index],
prev: zonalTokens[i-1],
distance: tokenDistance(zonalTokens[i-1], zonalTokens[index]),
})
currentInstance.totalDistance += tokenDistance(zonalTokens[i-1], zonalTokens[index])
}
}

Expand All @@ -149,7 +165,6 @@ func (g *MinimizeSpreadTokenGenerator) GenerateTokens(ring *Desc, id, zone strin

heap.Init(distancesHeap)

currentInstance := &totalTokenPerInstance{id: id, zone: zone}
expectedOwnership := float64(1) / (float64(len(tokensPerInstanceWithDistance) + 1))
expectedOwnershipDistance := int64(expectedOwnership * maxTokenValue)

Expand Down
8 changes: 8 additions & 0 deletions pkg/ring/token_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ func TestMinimizeSpreadTokenGenerator(t *testing.T) {
}
require.Equal(t, mTokenGenerator.called, len(zones))

// Test generating tokens for an ingester that already have token
rg := NewRandomTokenGenerator()
rTokens := rg.GenerateTokens(rindDesc, "partial", zones[0], 256, true)
rindDesc.AddIngester("partial", "partial", zones[0], rTokens, ACTIVE, time.Now())
nTokens := minimizeTokenGenerator.GenerateTokens(rindDesc, "partial", zones[0], 256, true)
rindDesc.AddIngester("partial", "partial", zones[0], append(rTokens, nTokens...), ACTIVE, time.Now())
assertDistancePerIngester(t, rindDesc, 0.01)

mTokenGenerator.called = 0
// Should fallback to random generator when more than 1 ingester does not have tokens and force flag is set
rindDesc.AddIngester("pendingIngester-1", "pendingIngester-1", zones[0], []uint32{}, PENDING, time.Now())
Expand Down

0 comments on commit 009f61e

Please sign in to comment.