From 7548068faa179ddf075ba11677f72327892ceb4b Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 5 Nov 2024 10:42:44 -0800 Subject: [PATCH] Preserve ingester state on restart (#6301) * Expand token file to record previous state before shutting down and join ring with previous state Signed-off-by: Alex Le * Added logs and unit tests Signed-off-by: Alex Le * fix test and add instance state serialization code Signed-off-by: Alex Le * update comments Signed-off-by: Alex Le * addressed comments and added compatibility tests Signed-off-by: Alex Le --------- Signed-off-by: Alex Le --- pkg/compactor/compactor_test.go | 1 + pkg/ring/basic_lifecycler_delegates.go | 6 +- pkg/ring/basic_lifecycler_delegates_test.go | 20 +++-- pkg/ring/lifecycler.go | 88 +++++++++++++++---- pkg/ring/lifecycler_test.go | 16 +++- pkg/ring/token_file.go | 79 +++++++++++++++++ pkg/ring/token_file_test.go | 97 +++++++++++++++++++++ pkg/ring/tokens.go | 76 ---------------- pkg/ring/tokens_test.go | 30 ------- pkg/storegateway/gateway_test.go | 3 +- 10 files changed, 279 insertions(+), 137 deletions(-) create mode 100644 pkg/ring/token_file.go create mode 100644 pkg/ring/token_file_test.go diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index cf76ff735a..7da85dc856 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1476,6 +1476,7 @@ func removeIgnoredLogs(input []string) []string { ignoredLogStringsRegexList := []*regexp.Regexp{ regexp.MustCompile(`^level=(info|debug|warn) component=cleaner .+$`), + regexp.MustCompile(`^level=info component=compactor msg="set state" .+$`), } out := make([]string, 0, len(input)) diff --git a/pkg/ring/basic_lifecycler_delegates.go b/pkg/ring/basic_lifecycler_delegates.go index 26e3cfa41d..a3d083936a 100644 --- a/pkg/ring/basic_lifecycler_delegates.go +++ b/pkg/ring/basic_lifecycler_delegates.go @@ -70,7 +70,7 @@ func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLife return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc) } - tokensFromFile, err := LoadTokensFromFile(d.tokensPath) + tokenFile, err := LoadTokenFile(d.tokensPath) if err != nil { if !os.IsNotExist(err) { level.Error(d.logger).Log("msg", "error loading tokens from file", "err", err) @@ -78,6 +78,7 @@ func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLife return d.next.OnRingInstanceRegister(lifecycler, ringDesc, instanceExists, instanceID, instanceDesc) } + tokensFromFile := tokenFile.Tokens // Signal the next delegate that the tokens have been loaded, miming the // case the instance exist in the ring (which is OK because the lifecycler @@ -94,7 +95,8 @@ func (d *TokensPersistencyDelegate) OnRingInstanceRegister(lifecycler *BasicLife func (d *TokensPersistencyDelegate) OnRingInstanceTokens(lifecycler *BasicLifecycler, tokens Tokens) { if d.tokensPath != "" { - if err := tokens.StoreToFile(d.tokensPath); err != nil { + tokenFile := TokenFile{Tokens: tokens} + if err := tokenFile.StoreToFile(d.tokensPath); err != nil { level.Error(d.logger).Log("msg", "error storing tokens to disk", "path", d.tokensPath, "err", err) } } diff --git a/pkg/ring/basic_lifecycler_delegates_test.go b/pkg/ring/basic_lifecycler_delegates_test.go index 67a22ffa94..1a81233ac9 100644 --- a/pkg/ring/basic_lifecycler_delegates_test.go +++ b/pkg/ring/basic_lifecycler_delegates_test.go @@ -69,22 +69,23 @@ func TestTokensPersistencyDelegate_ShouldSkipTokensLoadingIfFileDoesNotExist(t * require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler)) // Ensure tokens have been stored. - actualTokens, err := LoadTokensFromFile(tokensFile.Name()) + tokenFile, err := LoadTokenFile(tokensFile.Name()) require.NoError(t, err) - assert.Equal(t, Tokens{1, 2, 3, 4, 5}, actualTokens) + assert.Equal(t, Tokens{1, 2, 3, 4, 5}, tokenFile.Tokens) // Ensure no error has been logged. assert.Empty(t, logs.String()) } -func TestTokensPersistencyDelegate_ShouldLoadTokensFromFileIfFileExist(t *testing.T) { +func TestTokensPersistencyDelegate_ShouldLoadTokenFileIfFileExist(t *testing.T) { tokensFile, err := os.CreateTemp("", "tokens-*") require.NoError(t, err) defer os.Remove(tokensFile.Name()) //nolint:errcheck // Store some tokens to the file. storedTokens := Tokens{6, 7, 8, 9, 10} - require.NoError(t, storedTokens.StoreToFile(tokensFile.Name())) + tokenFile1 := TokenFile{Tokens: storedTokens} + require.NoError(t, tokenFile1.StoreToFile(tokensFile.Name())) testDelegate := &mockDelegate{ onRegister: func(lifecycler *BasicLifecycler, ringDesc Desc, instanceExists bool, instanceID string, instanceDesc InstanceDesc) (InstanceState, Tokens) { @@ -113,9 +114,9 @@ func TestTokensPersistencyDelegate_ShouldLoadTokensFromFileIfFileExist(t *testin require.NoError(t, services.StopAndAwaitTerminated(ctx, lifecycler)) // Ensure we can still read back the tokens file. - actualTokens, err := LoadTokensFromFile(tokensFile.Name()) + tokenFile, err := LoadTokenFile(tokensFile.Name()) require.NoError(t, err) - assert.Equal(t, storedTokens, actualTokens) + assert.Equal(t, storedTokens, tokenFile.Tokens) } func TestTokensPersistencyDelegate_ShouldHandleTheCaseTheInstanceIsAlreadyInTheRing(t *testing.T) { @@ -150,7 +151,8 @@ func TestTokensPersistencyDelegate_ShouldHandleTheCaseTheInstanceIsAlreadyInTheR defer os.Remove(tokensFile.Name()) //nolint:errcheck // Store some tokens to the file. - require.NoError(t, storedTokens.StoreToFile(tokensFile.Name())) + tokenFile1 := TokenFile{Tokens: storedTokens} + require.NoError(t, tokenFile1.StoreToFile(tokensFile.Name())) // We assume is already registered to the ring. registeredAt := time.Now().Add(-time.Hour) @@ -226,9 +228,9 @@ func TestDelegatesChain(t *testing.T) { assert.True(t, onStoppingCalled) // Ensure tokens have been stored. - actualTokens, err := LoadTokensFromFile(tokensFile.Name()) + tokenFile, err := LoadTokenFile(tokensFile.Name()) require.NoError(t, err) - assert.Equal(t, Tokens{1, 2, 3, 4, 5}, actualTokens) + assert.Equal(t, Tokens{1, 2, 3, 4, 5}, tokenFile.Tokens) } func TestAutoForgetDelegate(t *testing.T) { diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index e9b9970e1a..8ad3b494da 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -130,7 +130,7 @@ type Lifecycler struct { // goes away and comes back empty. The state changes during lifecycle of instance. stateMtx sync.RWMutex state InstanceState - tokens Tokens + tokenFile *TokenFile registeredAt time.Time // Controls the ready-reporting @@ -205,6 +205,7 @@ func NewLifecycler( actorChan: make(chan func()), autojoinChan: make(chan struct{}, 1), state: PENDING, + tokenFile: &TokenFile{PreviousState: ACTIVE}, lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), logger: logger, tg: tg, @@ -301,6 +302,7 @@ func (i *Lifecycler) GetState() InstanceState { func (i *Lifecycler) setState(state InstanceState) { i.stateMtx.Lock() defer i.stateMtx.Unlock() + level.Info(i.logger).Log("msg", "set state", "old_state", i.state, "new_state", state) i.state = state } @@ -334,7 +336,7 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state InstanceState) error func (i *Lifecycler) getTokens() Tokens { i.stateMtx.RLock() defer i.stateMtx.RUnlock() - return i.tokens + return i.tokenFile.Tokens } func (i *Lifecycler) setTokens(tokens Tokens) { @@ -343,14 +345,54 @@ func (i *Lifecycler) setTokens(tokens Tokens) { i.stateMtx.Lock() defer i.stateMtx.Unlock() - i.tokens = tokens + i.tokenFile.Tokens = tokens if i.cfg.TokensFilePath != "" { - if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil { + if err := i.tokenFile.StoreToFile(i.cfg.TokensFilePath); err != nil { level.Error(i.logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err) } } } +func (i *Lifecycler) getPreviousState() InstanceState { + i.stateMtx.RLock() + defer i.stateMtx.RUnlock() + return i.tokenFile.PreviousState +} + +func (i *Lifecycler) setPreviousState(state InstanceState) { + i.stateMtx.Lock() + defer i.stateMtx.Unlock() + + if !(state == ACTIVE || state == READONLY) { + level.Error(i.logger).Log("msg", "cannot store unsupported state to disk", "new_state", state, "old_state", i.tokenFile.PreviousState) + return + } + + i.tokenFile.PreviousState = state + if i.cfg.TokensFilePath != "" { + if err := i.tokenFile.StoreToFile(i.cfg.TokensFilePath); err != nil { + level.Error(i.logger).Log("msg", "error storing state to disk", "path", i.cfg.TokensFilePath, "err", err) + } else { + level.Info(i.logger).Log("msg", "saved state to disk", "state", state, "path", i.cfg.TokensFilePath) + } + } +} + +func (i *Lifecycler) loadTokenFile() (*TokenFile, error) { + + t, err := LoadTokenFile(i.cfg.TokensFilePath) + if err != nil { + return nil, err + } + + i.stateMtx.Lock() + defer i.stateMtx.Unlock() + + i.tokenFile = t + level.Info(i.logger).Log("msg", "loaded token file", "state", i.tokenFile.PreviousState, "num_tokens", len(i.tokenFile.Tokens), "path", i.cfg.TokensFilePath) + return i.tokenFile, nil +} + func (i *Lifecycler) getRegisteredAt() time.Time { i.stateMtx.RLock() defer i.stateMtx.RUnlock() @@ -501,8 +543,8 @@ func (i *Lifecycler) loop(ctx context.Context) error { level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) observeChan = time.After(i.cfg.ObservePeriod) } else { - if err := i.autoJoin(context.Background(), ACTIVE); err != nil { - return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + if err := i.autoJoin(context.Background(), i.getPreviousState()); err != nil { + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s, state: %s", i.RingName, i.getPreviousState()) } } } @@ -519,9 +561,9 @@ func (i *Lifecycler) loop(ctx context.Context) error { if i.verifyTokens(context.Background()) { level.Info(i.logger).Log("msg", "token verification successful", "ring", i.RingName) - err := i.changeState(context.Background(), ACTIVE) + err := i.changeState(context.Background(), i.getPreviousState()) if err != nil { - level.Error(i.logger).Log("msg", "failed to set state to ACTIVE", "ring", i.RingName, "err", err) + level.Error(i.logger).Log("msg", "failed to set state", "ring", i.RingName, "state", i.getPreviousState(), "err", err) } } else { level.Info(i.logger).Log("msg", "token verification failed, observing", "ring", i.RingName) @@ -564,6 +606,12 @@ func (i *Lifecycler) stopping(runningError error) error { heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod) defer heartbeatTickerStop() + // save current state into file + if i.cfg.TokensFilePath != "" { + currentState := i.GetState() + i.setPreviousState(currentState) + } + // Mark ourselved as Leaving so no more samples are send to us. err := i.changeState(context.Background(), LEAVING) if err != nil { @@ -613,9 +661,13 @@ func (i *Lifecycler) initRing(ctx context.Context) error { ) if i.cfg.TokensFilePath != "" { - tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath) + tokenFile, err := i.loadTokenFile() if err != nil && !os.IsNotExist(err) { - level.Error(i.logger).Log("msg", "error loading tokens from file", "err", err) + level.Error(i.logger).Log("msg", "error loading tokens and previous state from file", "err", err) + } + + if tokenFile != nil { + tokensFromFile = tokenFile.Tokens } } else { level.Info(i.logger).Log("msg", "not loading tokens from file, tokens file path is empty") @@ -639,7 +691,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { if len(tokensFromFile) > 0 { level.Info(i.logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile)) if len(tokensFromFile) >= i.cfg.NumTokens && i.autoJoinOnStartup { - i.setState(ACTIVE) + i.setState(i.getPreviousState()) } ringDesc.AddIngester(i.ID, i.Addr, i.Zone, tokensFromFile, i.GetState(), registeredAt) i.setTokens(tokensFromFile) @@ -669,11 +721,11 @@ func (i *Lifecycler) initRing(ctx context.Context) error { // If the ingester failed to clean its ring entry up in can leave its state in LEAVING // OR unregister_on_shutdown=false - // if autoJoinOnStartup, move it into ACTIVE to ensure the ingester joins the ring. - // else set to PENDING + // if autoJoinOnStartup, move it into previous state based on token file (default: ACTIVE) + // to ensure the ingester joins the ring. else set to PENDING if instanceDesc.State == LEAVING && len(instanceDesc.Tokens) != 0 { if i.autoJoinOnStartup { - instanceDesc.State = ACTIVE + instanceDesc.State = i.getPreviousState() } else { instanceDesc.State = PENDING } @@ -908,10 +960,12 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error { currState := i.GetState() // Only the following state transitions can be triggered externally - if !((currState == PENDING && state == JOINING) || // triggered by TransferChunks at the beginning - (currState == JOINING && state == PENDING) || // triggered by TransferChunks on failure - (currState == JOINING && state == ACTIVE) || // triggered by TransferChunks on success + if !((currState == PENDING && state == JOINING) || + (currState == JOINING && state == PENDING) || + (currState == JOINING && state == ACTIVE) || + (currState == JOINING && state == READONLY) || (currState == PENDING && state == ACTIVE) || // triggered by autoJoin + (currState == PENDING && state == READONLY) || // triggered by autoJoin (currState == ACTIVE && state == LEAVING) || // triggered by shutdown (currState == ACTIVE && state == READONLY) || // triggered by ingester mode (currState == READONLY && state == ACTIVE) || // triggered by ingester mode diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 0b8a6402db..cdd684dd96 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -716,7 +716,7 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2)) } -func TestTokensOnDisk(t *testing.T) { +func TestTokenFileOnDisk(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -756,6 +756,18 @@ func TestTokensOnDisk(t *testing.T) { len(desc.Ingesters["ing1"].Tokens) == 512 }) + // Change state from ACTIVE to READONLY + err = l1.ChangeState(context.Background(), READONLY) + require.NoError(t, err) + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + desc.Ingesters["ing1"].State == READONLY + }) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1)) // Start new ingester at same token directory. @@ -776,7 +788,7 @@ func TestTokensOnDisk(t *testing.T) { } return ok && len(desc.Ingesters) == 1 && - desc.Ingesters["ing2"].State == ACTIVE && + desc.Ingesters["ing2"].State == READONLY && len(desc.Ingesters["ing2"].Tokens) == 512 }) diff --git a/pkg/ring/token_file.go b/pkg/ring/token_file.go new file mode 100644 index 0000000000..328c0fa13d --- /dev/null +++ b/pkg/ring/token_file.go @@ -0,0 +1,79 @@ +package ring + +import ( + "encoding/json" + "errors" + "os" + "sort" +) + +type TokenFile struct { + PreviousState InstanceState `json:"previousState,omitempty"` + Tokens Tokens `json:"tokens"` +} + +// StoreToFile stores the tokens in the given directory. +func (l TokenFile) StoreToFile(tokenFilePath string) error { + if tokenFilePath == "" { + return errors.New("path is empty") + } + + // If any operations failed further in the function, we keep the temporary + // file hanging around for debugging. + f, err := os.Create(tokenFilePath + ".tmp") + if err != nil { + return err + } + + defer func() { + // If the file was not closed, then there must already be an error, hence ignore + // the error (if any) from f.Close(). If the file was already closed, then + // we would ignore the error in that case too. + _ = f.Close() + }() + + b, err := json.Marshal(l) + if err != nil { + return err + } + if _, err = f.Write(b); err != nil { + return err + } + + if err := f.Close(); err != nil { + return err + } + + // Tokens successfully written, replace the temporary file with the actual file path. + return os.Rename(f.Name(), tokenFilePath) +} + +func LoadTokenFile(tokenFilePath string) (*TokenFile, error) { + b, err := os.ReadFile(tokenFilePath) + if err != nil { + return nil, err + } + t := TokenFile{} + err = json.Unmarshal(b, &t) + + // Tokens may have been written to file by an older version which + // doesn't guarantee sorted tokens, so we enforce sorting here. + if !sort.IsSorted(t.Tokens) { + sort.Sort(t.Tokens) + } + + return &t, err +} + +func (p InstanceState) MarshalJSON() ([]byte, error) { + ss := InstanceState_name[int32(p)] + return json.Marshal(ss) +} +func (p *InstanceState) UnmarshalJSON(data []byte) error { + res := "" + if err := json.Unmarshal(data, &res); err != nil { + return err + } + *p = InstanceState(InstanceState_value[res]) + return nil +} diff --git a/pkg/ring/token_file_test.go b/pkg/ring/token_file_test.go new file mode 100644 index 0000000000..a456da5afe --- /dev/null +++ b/pkg/ring/token_file_test.go @@ -0,0 +1,97 @@ +package ring + +import ( + "encoding/json" + "math/rand" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTokenFile_Serialization(t *testing.T) { + tokens := make(Tokens, 0, 512) + for i := 0; i < 512; i++ { + tokens = append(tokens, uint32(rand.Int31())) + } + tokenFile := TokenFile{ + PreviousState: READONLY, + Tokens: tokens, + } + b, err := json.Marshal(tokenFile) + require.NoError(t, err) + + unmarshaledTokenFile := TokenFile{} + require.NoError(t, json.Unmarshal(b, &unmarshaledTokenFile)) + require.Equal(t, tokens, unmarshaledTokenFile.Tokens) + require.Equal(t, READONLY, unmarshaledTokenFile.PreviousState) +} + +func TestTokenFile_Serialization_ForwardCompatibility(t *testing.T) { + tokens := make(Tokens, 0, 512) + for i := 0; i < 512; i++ { + tokens = append(tokens, uint32(rand.Int31())) + } + b, err := oldMarshal(tokens) + require.NoError(t, err) + + unmarshaledTokenFile := TokenFile{} + require.NoError(t, json.Unmarshal(b, &unmarshaledTokenFile)) + require.Equal(t, tokens, unmarshaledTokenFile.Tokens) + require.Equal(t, ACTIVE, unmarshaledTokenFile.PreviousState) +} + +func TestTokenFile_Serialization_BackwardCompatibility(t *testing.T) { + tokens := make(Tokens, 0, 512) + for i := 0; i < 512; i++ { + tokens = append(tokens, uint32(rand.Int31())) + } + tokenFile := TokenFile{ + PreviousState: READONLY, + Tokens: tokens, + } + b, err := json.Marshal(tokenFile) + require.NoError(t, err) + + unmarshaledTokens := Tokens{} + require.NoError(t, oldUnmarshal(b, &unmarshaledTokens)) + require.Equal(t, tokens, unmarshaledTokens) +} + +func TestLoadTokenFile_ShouldGuaranteeSortedTokens(t *testing.T) { + tmpDir := t.TempDir() + + // Store tokens to file. + origTokens := Tokens{1, 5, 3} + orig := TokenFile{ + Tokens: origTokens, + } + + require.NoError(t, orig.StoreToFile(filepath.Join(tmpDir, "tokens"))) + + // Read back and ensure they're sorted. + actual, err := LoadTokenFile(filepath.Join(tmpDir, "tokens")) + require.NoError(t, err) + assert.Equal(t, Tokens{1, 3, 5}, actual.Tokens) +} + +// Copied from removed code for compatibility test +func oldMarshal(t Tokens) ([]byte, error) { + return json.Marshal(tokensJSON{Tokens: t}) +} + +// Copied from removed code for compatibility test +func oldUnmarshal(b []byte, t *Tokens) error { + tj := tokensJSON{} + if err := json.Unmarshal(b, &tj); err != nil { + return err + } + *t = tj.Tokens + return nil +} + +// Copied from removed code for compatibility test +type tokensJSON struct { + Tokens []uint32 `json:"tokens"` +} diff --git a/pkg/ring/tokens.go b/pkg/ring/tokens.go index cf4999ff5d..28c57d460d 100644 --- a/pkg/ring/tokens.go +++ b/pkg/ring/tokens.go @@ -1,9 +1,6 @@ package ring import ( - "encoding/json" - "errors" - "os" "sort" ) @@ -32,76 +29,3 @@ func (t Tokens) Equals(other Tokens) bool { return true } - -// StoreToFile stores the tokens in the given directory. -func (t Tokens) StoreToFile(tokenFilePath string) error { - if tokenFilePath == "" { - return errors.New("path is empty") - } - - // If any operations failed further in the function, we keep the temporary - // file hanging around for debugging. - f, err := os.Create(tokenFilePath + ".tmp") - if err != nil { - return err - } - - defer func() { - // If the file was not closed, then there must already be an error, hence ignore - // the error (if any) from f.Close(). If the file was already closed, then - // we would ignore the error in that case too. - _ = f.Close() - }() - - b, err := t.Marshal() - if err != nil { - return err - } - if _, err = f.Write(b); err != nil { - return err - } - - if err := f.Close(); err != nil { - return err - } - - // Tokens successfully written, replace the temporary file with the actual file path. - return os.Rename(f.Name(), tokenFilePath) -} - -// LoadTokensFromFile loads tokens from given file path. -func LoadTokensFromFile(tokenFilePath string) (Tokens, error) { - b, err := os.ReadFile(tokenFilePath) - if err != nil { - return nil, err - } - var t Tokens - err = t.Unmarshal(b) - - // Tokens may have been written to file by an older version which - // doesn't guarantee sorted tokens, so we enforce sorting here. - if !sort.IsSorted(t) { - sort.Sort(t) - } - - return t, err -} - -// Marshal encodes the tokens into JSON. -func (t Tokens) Marshal() ([]byte, error) { - return json.Marshal(tokensJSON{Tokens: t}) -} - -// Unmarshal reads the tokens from JSON byte stream. -func (t *Tokens) Unmarshal(b []byte) error { - tj := tokensJSON{} - if err := json.Unmarshal(b, &tj); err != nil { - return err - } - *t = tj.Tokens - return nil -} - -type tokensJSON struct { - Tokens []uint32 `json:"tokens"` -} diff --git a/pkg/ring/tokens_test.go b/pkg/ring/tokens_test.go index 58822844d8..0aa54c4c7d 100644 --- a/pkg/ring/tokens_test.go +++ b/pkg/ring/tokens_test.go @@ -1,28 +1,11 @@ package ring import ( - "math/rand" - "path/filepath" "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -func TestTokens_Serialization(t *testing.T) { - tokens := make(Tokens, 0, 512) - for i := 0; i < 512; i++ { - tokens = append(tokens, uint32(rand.Int31())) - } - - b, err := tokens.Marshal() - require.NoError(t, err) - - var unmarshaledTokens Tokens - require.NoError(t, unmarshaledTokens.Unmarshal(b)) - require.Equal(t, tokens, unmarshaledTokens) -} - func TestTokens_Equals(t *testing.T) { tests := []struct { first Tokens @@ -56,16 +39,3 @@ func TestTokens_Equals(t *testing.T) { assert.Equal(t, c.expected, c.second.Equals(c.first)) } } - -func TestLoadTokensFromFile_ShouldGuaranteeSortedTokens(t *testing.T) { - tmpDir := t.TempDir() - - // Store tokens to file. - orig := Tokens{1, 5, 3} - require.NoError(t, orig.StoreToFile(filepath.Join(tmpDir, "tokens"))) - - // Read back and ensure they're sorted. - actual, err := LoadTokensFromFile(filepath.Join(tmpDir, "tokens")) - require.NoError(t, err) - assert.Equal(t, Tokens{1, 3, 5}, actual) -} diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index b68fe3c707..3102638192 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -589,7 +589,8 @@ func TestStoreGateway_ShouldSupportLoadRingTokensFromFile(t *testing.T) { defer os.Remove(tokensFile.Name()) //nolint:errcheck // Store some tokens to the file. - require.NoError(t, testData.storedTokens.StoreToFile(tokensFile.Name())) + tokenFile := ring.TokenFile{Tokens: testData.storedTokens} + require.NoError(t, tokenFile.StoreToFile(tokensFile.Name())) ctx := context.Background() gatewayCfg := mockGatewayConfig()