Skip to content

Commit

Permalink
Allowing renenerate ingester tokens (#6063)
Browse files Browse the repository at this point in the history
* Allowing renenerate ingester tokens

Signed-off-by: alanprot <[email protected]>

* Changelog + lint

Signed-off-by: alanprot <[email protected]>

---------

Signed-off-by: alanprot <[email protected]>
  • Loading branch information
alanprot authored Jul 5, 2024
1 parent d1b6d26 commit b6f0f3c
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [ENHANCEMENT] Upgrade Alpine to 3.19. #6014
* [ENHANCEMENT] Upgrade go to 1.21.11 #6014
* [ENHANCEMENT] Ingester: Add a new experimental `-ingester.labels-string-interning-enabled` flag to enable string interning for metrics labels. #6057
* [ENHANCEMENT] Ingester: Add link to renew 10% of the ingesters tokens in the admin page. #6063
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018
Expand Down
3 changes: 3 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ type Ingester interface {
client.IngesterServer
FlushHandler(http.ResponseWriter, *http.Request)
ShutdownHandler(http.ResponseWriter, *http.Request)
RenewTokenHandler(http.ResponseWriter, *http.Request)
Push(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)
}

Expand All @@ -292,8 +293,10 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config) {

a.indexPage.AddLink(SectionDangerous, "/ingester/flush", "Trigger a Flush of data from Ingester to storage")
a.indexPage.AddLink(SectionDangerous, "/ingester/shutdown", "Trigger Ingester Shutdown (Dangerous)")
a.indexPage.AddLink(SectionDangerous, "/ingester/renewTokens", "Renew Ingester Tokens (10%)")
a.RegisterRoute("/ingester/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.

// Legacy Routes
Expand Down
5 changes: 5 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,11 @@ func (i *Ingester) updateActiveSeries(ctx context.Context) {
}
}

func (i *Ingester) RenewTokenHandler(w http.ResponseWriter, r *http.Request) {
i.lifecycler.RenewTokens(0.1, r.Context())
w.WriteHeader(http.StatusNoContent)
}

// ShutdownHandler triggers the following set of operations in order:
// - Change the state of ring to stop accepting writes.
// - Flush all the chunks.
Expand Down
46 changes: 46 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,52 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
return err
}

func (i *Lifecycler) RenewTokens(ratio float64, ctx context.Context) {
if ratio > 1 {
ratio = 1
}
err := i.KVStore.CAS(ctx, i.RingKey, func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
return in, false, nil
}

ringDesc := in.(*Desc)
_, ok := ringDesc.Ingesters[i.ID]

if !ok {
return in, false, nil
}

tokensToBeRenewed := int(float64(i.cfg.NumTokens) * ratio)
ringTokens, _ := ringDesc.TokensFor(i.ID)

// Removing random tokens
for i := 0; i < tokensToBeRenewed; i++ {
if len(ringTokens) == 0 {
break
}
index := mathrand.Int() % len(ringTokens)
ringTokens = append(ringTokens[:index], ringTokens[index+1:]...)
}

needTokens := i.cfg.NumTokens - len(ringTokens)
level.Info(i.logger).Log("msg", "renewing new tokens", "count", needTokens, "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt())
newTokens := i.tg.GenerateTokens(ringDesc, i.ID, i.Zone, needTokens, true)

ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, ringTokens, i.GetState(), i.getRegisteredAt())
i.setTokens(ringTokens)
return ringDesc, true, nil
})

if err != nil {
level.Error(i.logger).Log("msg", "failed to regenerate tokens", "ring", i.RingName, "err", err)
}
}

// Verifies that tokens that this ingester has registered to the ring still belong to it.
// Gossiping ring may change the ownership of tokens in case of conflicts.
// If ingester doesn't own its tokens anymore, this method generates new tokens and puts them to the ring.
Expand Down
44 changes: 44 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"

"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -75,6 +76,49 @@ func TestLifecycler_JoinShouldNotBlock(t *testing.T) {
}
}

func TestLifecycler_RenewTokens(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

ctx := context.Background()
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.HeartbeatPeriod = 100 * time.Millisecond
lifecyclerConfig.NumTokens = 512

l1, err := NewLifecycler(lifecyclerConfig, &nopFlushTransferer{}, "ingester", ringKey, true, true, log.NewNopLogger(), nil)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(ctx, l1))
defer services.StopAndAwaitTerminated(ctx, l1) // nolint:errcheck

waitRingInstance(t, 3*time.Second, l1, func(instance InstanceDesc) error {
if instance.State != ACTIVE {
return errors.New("should be active")
}
return nil
})

originalTokens := l1.getTokens()
require.Len(t, originalTokens, 512)
require.IsIncreasing(t, originalTokens)
l1.RenewTokens(0.1, ctx)
newTokens := l1.getTokens()
require.Len(t, newTokens, 512)
require.IsIncreasing(t, newTokens)
diff := 0
for i := 0; i < len(originalTokens); i++ {
if !slices.Contains(originalTokens, newTokens[i]) {
diff++
}
}

require.Equal(t, 51, diff)
}

func TestLifecycler_DefferedJoin(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down

0 comments on commit b6f0f3c

Please sign in to comment.