Skip to content

Commit

Permalink
Preserve ingester state on restart (#6301)
Browse files Browse the repository at this point in the history
* Expand token file to record previous state before shutting down and join ring with previous state

Signed-off-by: Alex Le <[email protected]>

* Added logs and unit tests

Signed-off-by: Alex Le <[email protected]>

* fix test and add instance state serialization code

Signed-off-by: Alex Le <[email protected]>

* update comments

Signed-off-by: Alex Le <[email protected]>

* addressed comments and added compatibility tests

Signed-off-by: Alex Le <[email protected]>

---------

Signed-off-by: Alex Le <[email protected]>
  • Loading branch information
alexqyle authored Nov 5, 2024
1 parent 661f47b commit 7548068
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 137 deletions.
1 change: 1 addition & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,7 @@ func removeIgnoredLogs(input []string) []string {

ignoredLogStringsRegexList := []*regexp.Regexp{
regexp.MustCompile(`^level=(info|debug|warn) component=cleaner .+$`),
regexp.MustCompile(`^level=info component=compactor msg="set state" .+$`),
}

out := make([]string, 0, len(input))
Expand Down
6 changes: 4 additions & 2 deletions pkg/ring/basic_lifecycler_delegates.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLife
return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}

tokensFromFile, err := LoadTokensFromFile(d.tokensPath)
tokenFile, err := LoadTokenFile(d.tokensPath)
if err != nil {
if !os.IsNotExist(err) {
level.Error(d.logger).Log("msg", "error loading tokens from file", "err", err)
}

return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc)
}
tokensFromFile := tokenFile.Tokens

// Signal the next delegate that the tokens have been loaded, miming the
// case the instance exist in the ring (which is OK because the lifecycler
Expand All @@ -94,7 +95,8 @@ func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLife

func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) {
if d.tokensPath != "" {
if err := tokens.StoreToFile(d.tokensPath); err != nil {
tokenFile := TokenFile{Tokens: tokens}
if err := tokenFile.StoreToFile(d.tokensPath); err != nil {
level.Error(d.logger).Log("msg", "error storing tokens to disk", "path", d.tokensPath, "err", err)
}
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/ring/basic_lifecycler_delegates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,23 @@ func TestTokensPersistencyDelegate_ShouldSkipTokensLoadingIfFileDoesNotExist(t *
require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler))

// Ensure tokens have been stored.
actualTokens, err := LoadTokensFromFile(tokensFile.Name())
tokenFile, err := LoadTokenFile(tokensFile.Name())
require.NoError(t, err)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, actualTokens)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, tokenFile.Tokens)

// Ensure no error has been logged.
assert.Empty(t, logs.String())
}

func TestTokensPersistencyDelegate_ShouldLoadTokensFromFileIfFileExist(t *testing.T) {
func TestTokensPersistencyDelegate_ShouldLoadTokenFileIfFileExist(t *testing.T) {
tokensFile, err := os.CreateTemp("", "tokens-*")
require.NoError(t, err)
defer os.Remove(tokensFile.Name()) //nolint:errcheck

// Store some tokens to the file.
storedTokens := Tokens{6, 7, 8, 9, 10}
require.NoError(t, storedTokens.StoreToFile(tokensFile.Name()))
tokenFile1 := TokenFile{Tokens: storedTokens}
require.NoError(t, tokenFile1.StoreToFile(tokensFile.Name()))

testDelegate := &mockDelegate{
onRegister: func(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) {
Expand Down Expand Up @@ -113,9 +114,9 @@ func TestTokensPersistencyDelegate_ShouldLoadTokensFromFileIfFileExist(t *testin
require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler))

// Ensure we can still read back the tokens file.
actualTokens, err := LoadTokensFromFile(tokensFile.Name())
tokenFile, err := LoadTokenFile(tokensFile.Name())
require.NoError(t, err)
assert.Equal(t, storedTokens, actualTokens)
assert.Equal(t, storedTokens, tokenFile.Tokens)
}

func TestTokensPersistencyDelegate_ShouldHandleTheCaseTheInstanceIsAlreadyInTheRing(t *testing.T) {
Expand Down Expand Up @@ -150,7 +151,8 @@ func TestTokensPersistencyDelegate_ShouldHandleTheCaseTheInstanceIsAlreadyInTheR
defer os.Remove(tokensFile.Name()) //nolint:errcheck

// Store some tokens to the file.
require.NoError(t, storedTokens.StoreToFile(tokensFile.Name()))
tokenFile1 := TokenFile{Tokens: storedTokens}
require.NoError(t, tokenFile1.StoreToFile(tokensFile.Name()))

// We assume is already registered to the ring.
registeredAt := time.Now().Add(-time.Hour)
Expand Down Expand Up @@ -226,9 +228,9 @@ func TestDelegatesChain(t *testing.T) {
assert.True(t, onStoppingCalled)

// Ensure tokens have been stored.
actualTokens, err := LoadTokensFromFile(tokensFile.Name())
tokenFile, err := LoadTokenFile(tokensFile.Name())
require.NoError(t, err)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, actualTokens)
assert.Equal(t, Tokens{1, 2, 3, 4, 5}, tokenFile.Tokens)
}

func TestAutoForgetDelegate(t *testing.T) {
Expand Down
88 changes: 71 additions & 17 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ type Lifecycler struct {
// goes away and comes back empty. The state changes during lifecycle of instance.
stateMtx sync.RWMutex
state InstanceState
tokens Tokens
tokenFile *TokenFile
registeredAt time.Time

// Controls the ready-reporting
Expand Down Expand Up @@ -205,6 +205,7 @@ func NewLifecycler(
actorChan: make(chan func()),
autojoinChan: make(chan struct{}, 1),
state: PENDING,
tokenFile: &TokenFile{PreviousState: ACTIVE},
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
tg: tg,
Expand Down Expand Up @@ -301,6 +302,7 @@ func (i *Lifecycler) GetState() InstanceState {
func (i *Lifecycler) setState(state InstanceState) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
level.Info(i.logger).Log("msg", "set state", "old_state", i.state, "new_state", state)
i.state = state
}

Expand Down Expand Up @@ -334,7 +336,7 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error
func (i *Lifecycler) getTokens() Tokens {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.tokens
return i.tokenFile.Tokens
}

func (i *Lifecycler) setTokens(tokens Tokens) {
Expand All @@ -343,14 +345,54 @@ func (i *Lifecycler) setTokens(tokens Tokens) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()

i.tokens = tokens
i.tokenFile.Tokens = tokens
if i.cfg.TokensFilePath != "" {
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
if err := i.tokenFile.StoreToFile(i.cfg.TokensFilePath); err != nil {
level.Error(i.logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
}
}
}

func (i *Lifecycler) getPreviousState() InstanceState {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
return i.tokenFile.PreviousState
}

func (i *Lifecycler) setPreviousState(state InstanceState) {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()

if !(state == ACTIVE || state == READONLY) {
level.Error(i.logger).Log("msg", "cannot store unsupported state to disk", "new_state", state, "old_state", i.tokenFile.PreviousState)
return
}

i.tokenFile.PreviousState = state
if i.cfg.TokensFilePath != "" {
if err := i.tokenFile.StoreToFile(i.cfg.TokensFilePath); err != nil {
level.Error(i.logger).Log("msg", "error storing state to disk", "path", i.cfg.TokensFilePath, "err", err)
} else {
level.Info(i.logger).Log("msg", "saved state to disk", "state", state, "path", i.cfg.TokensFilePath)
}
}
}

func (i *Lifecycler) loadTokenFile() (*TokenFile, error) {

t, err := LoadTokenFile(i.cfg.TokensFilePath)
if err != nil {
return nil, err
}

i.stateMtx.Lock()
defer i.stateMtx.Unlock()

i.tokenFile = t
level.Info(i.logger).Log("msg", "loaded token file", "state", i.tokenFile.PreviousState, "num_tokens", len(i.tokenFile.Tokens), "path", i.cfg.TokensFilePath)
return i.tokenFile, nil
}

func (i *Lifecycler) getRegisteredAt() time.Time {
i.stateMtx.RLock()
defer i.stateMtx.RUnlock()
Expand Down Expand Up @@ -501,8 +543,8 @@ func (i *Lifecycler) loop(ctx context.Context) error {
level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), ACTIVE); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil {
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState())
}
}
}
Expand All @@ -519,9 +561,9 @@ func (i *Lifecycler) loop(ctx context.Context) error {
if i.verifyTokens(context.Background()) {
level.Info(i.logger).Log("msg", "token verification successful", "ring", i.RingName)

err := i.changeState(context.Background(), ACTIVE)
err := i.changeState(context.Background(), i.getPreviousState())
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to ACTIVE", "ring", i.RingName, "err", err)
level.Error(i.logger).Log("msg", "failed to set state", "ring", i.RingName, "state", i.getPreviousState(), "err", err)
}
} else {
level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName)
Expand Down Expand Up @@ -564,6 +606,12 @@ func (i *Lifecycler) stopping(runningError error) error {
heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod)
defer heartbeatTickerStop()

// save current state into file
if i.cfg.TokensFilePath != "" {
currentState := i.GetState()
i.setPreviousState(currentState)
}

// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
if err != nil {
Expand Down Expand Up @@ -613,9 +661,13 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
)

if i.cfg.TokensFilePath != "" {
tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath)
tokenFile, err := i.loadTokenFile()
if err != nil && !os.IsNotExist(err) {
level.Error(i.logger).Log("msg", "error loading tokens from file", "err", err)
level.Error(i.logger).Log("msg", "error loading tokens and previous state from file", "err", err)
}

if tokenFile != nil {
tokensFromFile = tokenFile.Tokens
}
} else {
level.Info(i.logger).Log("msg", "not loading tokens from file, tokens file path is empty")
Expand All @@ -639,7 +691,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
if len(tokensFromFile) > 0 {
level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup {
i.setState(ACTIVE)
i.setState(i.getPreviousState())
}
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt)
i.setTokens(tokensFromFile)
Expand Down Expand Up @@ -669,11 +721,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error {

// If the ingester failed to clean its ring entry up in can leave its state in LEAVING
// OR unregister_on_shutdown=false
// if autoJoinOnStartup, move it into ACTIVE to ensure the ingester joins the ring.
// else set to PENDING
// if autoJoinOnStartup, move it into previous state based on token file (default: ACTIVE)
// to ensure the ingester joins the ring. else set to PENDING
if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) != 0 {
if i.autoJoinOnStartup {
instanceDesc.State = ACTIVE
instanceDesc.State = i.getPreviousState()
} else {
instanceDesc.State = PENDING
}
Expand Down Expand Up @@ -908,10 +960,12 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error {
currState := i.GetState()
// Only the following state transitions can be triggered externally
if !((currState == PENDING && state == JOINING) || // triggered by TransferChunks at the beginning
(currState == JOINING && state == PENDING) || // triggered by TransferChunks on failure
(currState == JOINING && state == ACTIVE) || // triggered by TransferChunks on success
if !((currState == PENDING && state == JOINING) ||
(currState == JOINING && state == PENDING) ||
(currState == JOINING && state == ACTIVE) ||
(currState == JOINING && state == READONLY) ||
(currState == PENDING && state == ACTIVE) || // triggered by autoJoin
(currState == PENDING && state == READONLY) || // triggered by autoJoin
(currState == ACTIVE && state == LEAVING) || // triggered by shutdown
(currState == ACTIVE && state == READONLY) || // triggered by ingester mode
(currState == READONLY && state == ACTIVE) || // triggered by ingester mode
Expand Down
16 changes: 14 additions & 2 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))
}

func TestTokensOnDisk(t *testing.T) {
func TestTokenFileOnDisk(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

Expand Down Expand Up @@ -756,6 +756,18 @@ func TestTokensOnDisk(t *testing.T) {
len(desc.Ingesters["ing1"].Tokens) == 512
})

// Change state from ACTIVE to READONLY
err = l1.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == READONLY
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))

// Start new ingester at same token directory.
Expand All @@ -776,7 +788,7 @@ func TestTokensOnDisk(t *testing.T) {
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == ACTIVE &&
desc.Ingesters["ing2"].State == READONLY &&
len(desc.Ingesters["ing2"].Tokens) == 512
})

Expand Down
79 changes: 79 additions & 0 deletions pkg/ring/token_file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package ring

import (
"encoding/json"
"errors"
"os"
"sort"
)

type TokenFile struct {
PreviousState InstanceState `json:"previousState,omitempty"`
Tokens Tokens `json:"tokens"`
}

// StoreToFile stores the tokens in the given directory.
func (l TokenFile) StoreToFile(tokenFilePath string) error {
if tokenFilePath == "" {
return errors.New("path is empty")
}

// If any operations failed further in the function, we keep the temporary
// file hanging around for debugging.
f, err := os.Create(tokenFilePath + ".tmp")
if err != nil {
return err
}

defer func() {
// If the file was not closed, then there must already be an error, hence ignore
// the error (if any) from f.Close(). If the file was already closed, then
// we would ignore the error in that case too.
_ = f.Close()
}()

b, err := json.Marshal(l)
if err != nil {
return err
}
if _, err = f.Write(b); err != nil {
return err
}

if err := f.Close(); err != nil {
return err
}

// Tokens successfully written, replace the temporary file with the actual file path.
return os.Rename(f.Name(), tokenFilePath)
}

func LoadTokenFile(tokenFilePath string) (*TokenFile, error) {
b, err := os.ReadFile(tokenFilePath)
if err != nil {
return nil, err
}
t := TokenFile{}
err = json.Unmarshal(b, &t)

// Tokens may have been written to file by an older version which
// doesn't guarantee sorted tokens, so we enforce sorting here.
if !sort.IsSorted(t.Tokens) {
sort.Sort(t.Tokens)
}

return &t, err
}

func (p InstanceState) MarshalJSON() ([]byte, error) {
ss := InstanceState_name[int32(p)]
return json.Marshal(ss)
}
func (p *InstanceState) UnmarshalJSON(data []byte) error {
res := ""
if err := json.Unmarshal(data, &res); err != nil {
return err
}
*p = InstanceState(InstanceState_value[res])
return nil
}
Loading

0 comments on commit 7548068

Please sign in to comment.